2 Commits

Author SHA1 Message Date
xiaochao
03f8f3fa21 Update coordinator.py 2025-07-04 17:44:56 +08:00
xiaochao
7702b96941 优化ssh连接数
增加硬盘信息缓存过期时间
2025-07-03 18:38:39 +08:00
10 changed files with 674 additions and 686 deletions

View File

@@ -1,89 +1,62 @@
import logging import logging
import asyncio
import asyncssh
from homeassistant.config_entries import ConfigEntry from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant from homeassistant.core import HomeAssistant
from homeassistant.helpers import config_validation as cv from .const import DOMAIN, DATA_UPDATE_COORDINATOR, PLATFORMS, CONF_ENABLE_DOCKER # 导入新增常量
from .const import (
DOMAIN, DATA_UPDATE_COORDINATOR, PLATFORMS, CONF_ENABLE_DOCKER,
CONF_HOST, DEFAULT_PORT
)
from .coordinator import FlynasCoordinator, UPSDataUpdateCoordinator from .coordinator import FlynasCoordinator, UPSDataUpdateCoordinator
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry): async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry):
config = {**entry.data, **entry.options} config = {**entry.data, **entry.options}
coordinator = FlynasCoordinator(hass, config, entry)
# 直接初始化不阻塞等待NAS上线 coordinator = FlynasCoordinator(hass, config)
await coordinator.async_config_entry_first_refresh()
_LOGGER.debug("协调器类型: %s", type(coordinator).__name__)
_LOGGER.debug("协调器是否有control_vm方法: %s", hasattr(coordinator, 'control_vm'))
_LOGGER.debug("协调器是否有vm_manager属性: %s", hasattr(coordinator, 'vm_manager'))
# 检查是否启用Docker并初始化Docker管理器如果有
enable_docker = config.get(CONF_ENABLE_DOCKER, False)
if enable_docker:
# 导入Docker管理器并初始化
from .docker_manager import DockerManager
coordinator.docker_manager = DockerManager(coordinator)
_LOGGER.debug("已启用Docker容器监控")
else:
coordinator.docker_manager = None
_LOGGER.debug("未启用Docker容器监控")
ups_coordinator = UPSDataUpdateCoordinator(hass, config, coordinator)
await ups_coordinator.async_config_entry_first_refresh()
hass.data.setdefault(DOMAIN, {}) hass.data.setdefault(DOMAIN, {})
hass.data[DOMAIN][entry.entry_id] = { hass.data[DOMAIN][entry.entry_id] = {
DATA_UPDATE_COORDINATOR: coordinator, DATA_UPDATE_COORDINATOR: coordinator,
"ups_coordinator": None, "ups_coordinator": ups_coordinator,
CONF_ENABLE_DOCKER: coordinator.config.get(CONF_ENABLE_DOCKER, False) CONF_ENABLE_DOCKER: enable_docker # 存储启用状态
} }
# 异步后台初始化
hass.async_create_task(async_delayed_setup(hass, entry, coordinator)) await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
entry.async_on_unload(entry.add_update_listener(async_update_entry))
return True return True
async def async_delayed_setup(hass: HomeAssistant, entry: ConfigEntry, coordinator: FlynasCoordinator):
try:
# 不阻塞等待NAS上线直接尝试刷新数据
await coordinator.async_config_entry_first_refresh()
enable_docker = coordinator.config.get(CONF_ENABLE_DOCKER, False)
if enable_docker:
from .docker_manager import DockerManager
coordinator.docker_manager = DockerManager(coordinator)
_LOGGER.debug("已启用Docker容器监控")
else:
coordinator.docker_manager = None
_LOGGER.debug("未启用Docker容器监控")
ups_coordinator = UPSDataUpdateCoordinator(hass, coordinator.config, coordinator)
await ups_coordinator.async_config_entry_first_refresh()
hass.data[DOMAIN][entry.entry_id]["ups_coordinator"] = ups_coordinator
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
entry.async_on_unload(entry.add_update_listener(async_update_entry))
_LOGGER.info("飞牛NAS集成初始化完成")
except Exception as e:
_LOGGER.error("飞牛NAS集成初始化失败: %s", str(e))
await coordinator.async_disconnect()
if hasattr(coordinator, '_ping_task') and coordinator._ping_task:
coordinator._ping_task.cancel()
async def async_update_entry(hass: HomeAssistant, entry: ConfigEntry): async def async_update_entry(hass: HomeAssistant, entry: ConfigEntry):
"""更新配置项""" await hass.config_entries.async_reload(entry.entry_id)
# 卸载现有集成
await async_unload_entry(hass, entry)
# 重新加载集成
await async_setup_entry(hass, entry)
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry): async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry):
"""卸载集成""" unload_ok = await hass.config_entries.async_unload_platforms(entry, PLATFORMS)
# 获取集成数据
domain_data = hass.data.get(DOMAIN, {}).get(entry.entry_id, {})
unload_ok = True
if DATA_UPDATE_COORDINATOR in domain_data: if unload_ok:
domain_data = hass.data[DOMAIN][entry.entry_id]
coordinator = domain_data[DATA_UPDATE_COORDINATOR] coordinator = domain_data[DATA_UPDATE_COORDINATOR]
ups_coordinator = domain_data.get("ups_coordinator") ups_coordinator = domain_data["ups_coordinator"]
# 卸载平台 # 关闭主协调器的SSH连接
unload_ok = await hass.config_entries.async_unload_platforms(entry, PLATFORMS) await coordinator.async_disconnect()
# 关闭UPS协调器
await ups_coordinator.async_shutdown()
if unload_ok: # 从DOMAIN中移除该entry的数据
# 关闭主协调器的SSH连接 hass.data[DOMAIN].pop(entry.entry_id)
await coordinator.async_disconnect()
# 关闭UPS协调器如果存在
if ups_coordinator:
await ups_coordinator.async_shutdown()
# 取消监控任务(如果存在)
if hasattr(coordinator, '_ping_task') and coordinator._ping_task and not coordinator._ping_task.done():
coordinator._ping_task.cancel()
# 从DOMAIN中移除该entry的数据
hass.data[DOMAIN].pop(entry.entry_id, None)
return unload_ok return unload_ok

View File

@@ -18,7 +18,11 @@ from .const import (
CONF_UPS_SCAN_INTERVAL, CONF_UPS_SCAN_INTERVAL,
DEFAULT_UPS_SCAN_INTERVAL, DEFAULT_UPS_SCAN_INTERVAL,
CONF_ROOT_PASSWORD, CONF_ROOT_PASSWORD,
CONF_ENABLE_DOCKER CONF_ENABLE_DOCKER,
CONF_MAX_CONNECTIONS,
DEFAULT_MAX_CONNECTIONS,
CONF_CACHE_TIMEOUT,
DEFAULT_CACHE_TIMEOUT
) )
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@@ -71,7 +75,17 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
default=DEFAULT_SCAN_INTERVAL default=DEFAULT_SCAN_INTERVAL
): int, ): int,
# 添加启用Docker的选项 # 添加启用Docker的选项
vol.Optional(CONF_ENABLE_DOCKER, default=False): bool vol.Optional(CONF_ENABLE_DOCKER, default=False): bool,
# 新增:最大连接数
vol.Optional(
CONF_MAX_CONNECTIONS,
default=DEFAULT_MAX_CONNECTIONS
): int,
# 新增:缓存超时时间(分钟)
vol.Optional(
CONF_CACHE_TIMEOUT,
default=DEFAULT_CACHE_TIMEOUT
): int
}) })
return self.async_show_form( return self.async_show_form(
@@ -104,6 +118,9 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
self.ssh_config[CONF_MAC] = selected_mac self.ssh_config[CONF_MAC] = selected_mac
# 确保将CONF_ENABLE_DOCKER也存入配置项 # 确保将CONF_ENABLE_DOCKER也存入配置项
self.ssh_config[CONF_ENABLE_DOCKER] = enable_docker self.ssh_config[CONF_ENABLE_DOCKER] = enable_docker
# 添加连接池和缓存配置
self.ssh_config[CONF_MAX_CONNECTIONS] = self.ssh_config.get(CONF_MAX_CONNECTIONS, DEFAULT_MAX_CONNECTIONS)
self.ssh_config[CONF_CACHE_TIMEOUT] = self.ssh_config.get(CONF_CACHE_TIMEOUT, DEFAULT_CACHE_TIMEOUT)
return self.async_create_entry( return self.async_create_entry(
title=self.ssh_config[CONF_HOST], title=self.ssh_config[CONF_HOST],
data=self.ssh_config data=self.ssh_config
@@ -220,7 +237,17 @@ class OptionsFlowHandler(config_entries.OptionsFlow):
vol.Optional( vol.Optional(
CONF_ENABLE_DOCKER, CONF_ENABLE_DOCKER,
default=data.get(CONF_ENABLE_DOCKER, False) default=data.get(CONF_ENABLE_DOCKER, False)
): bool ): bool,
# 新增:最大连接数
vol.Optional(
CONF_MAX_CONNECTIONS,
default=data.get(CONF_MAX_CONNECTIONS, DEFAULT_MAX_CONNECTIONS)
): int,
# 新增:缓存超时时间(分钟)
vol.Optional(
CONF_CACHE_TIMEOUT,
default=data.get(CONF_CACHE_TIMEOUT, DEFAULT_CACHE_TIMEOUT)
): int
}) })
return self.async_show_form( return self.async_show_form(

View File

@@ -52,4 +52,10 @@ ICON_RESTART = "mdi:restart"
# 设备标识符常量 # 设备标识符常量
DEVICE_ID_NAS = "flynas_nas_system" DEVICE_ID_NAS = "flynas_nas_system"
DEVICE_ID_UPS = "flynas_ups" DEVICE_ID_UPS = "flynas_ups"
CONF_NETWORK_MACS = "network_macs" CONF_NETWORK_MACS = "network_macs"
# 新增配置常量
CONF_MAX_CONNECTIONS = "max_connections"
CONF_CACHE_TIMEOUT = "cache_timeout"
DEFAULT_MAX_CONNECTIONS = 3
DEFAULT_CACHE_TIMEOUT = 30 # 单位:分钟

View File

@@ -1,7 +1,8 @@
import logging import logging
import asyncio
import asyncssh
import re import re
import asyncssh
import asyncio
import time
from datetime import timedelta from datetime import timedelta
from homeassistant.core import HomeAssistant from homeassistant.core import HomeAssistant
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
@@ -10,7 +11,8 @@ from .const import (
DOMAIN, CONF_HOST, CONF_PORT, CONF_USERNAME, CONF_PASSWORD, DOMAIN, CONF_HOST, CONF_PORT, CONF_USERNAME, CONF_PASSWORD,
CONF_IGNORE_DISKS, CONF_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL, CONF_IGNORE_DISKS, CONF_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL,
DEFAULT_PORT, CONF_MAC, CONF_UPS_SCAN_INTERVAL, DEFAULT_UPS_SCAN_INTERVAL, DEFAULT_PORT, CONF_MAC, CONF_UPS_SCAN_INTERVAL, DEFAULT_UPS_SCAN_INTERVAL,
CONF_ROOT_PASSWORD, CONF_ENABLE_DOCKER CONF_ROOT_PASSWORD, CONF_ENABLE_DOCKER, CONF_MAX_CONNECTIONS, DEFAULT_MAX_CONNECTIONS,
CONF_CACHE_TIMEOUT, DEFAULT_CACHE_TIMEOUT
) )
from .disk_manager import DiskManager from .disk_manager import DiskManager
from .system_manager import SystemManager from .system_manager import SystemManager
@@ -21,10 +23,8 @@ from .docker_manager import DockerManager
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
class FlynasCoordinator(DataUpdateCoordinator): class FlynasCoordinator(DataUpdateCoordinator):
def __init__(self, hass: HomeAssistant, config, config_entry) -> None: def __init__(self, hass: HomeAssistant, config) -> None:
self.config = config self.config = config
self.config_entry = config_entry
self.hass = hass
self.host = config[CONF_HOST] self.host = config[CONF_HOST]
self.port = config.get(CONF_PORT, DEFAULT_PORT) self.port = config.get(CONF_PORT, DEFAULT_PORT)
self.username = config[CONF_USERNAME] self.username = config[CONF_USERNAME]
@@ -32,19 +32,25 @@ class FlynasCoordinator(DataUpdateCoordinator):
self.root_password = config.get(CONF_ROOT_PASSWORD) self.root_password = config.get(CONF_ROOT_PASSWORD)
self.mac = config.get(CONF_MAC, "") self.mac = config.get(CONF_MAC, "")
self.enable_docker = config.get(CONF_ENABLE_DOCKER, False) self.enable_docker = config.get(CONF_ENABLE_DOCKER, False)
self.max_connections = config.get(CONF_MAX_CONNECTIONS, DEFAULT_MAX_CONNECTIONS)
self.cache_timeout = config.get(CONF_CACHE_TIMEOUT, DEFAULT_CACHE_TIMEOUT) * 60
self.docker_manager = DockerManager(self) if self.enable_docker else None self.docker_manager = DockerManager(self) if self.enable_docker else None
self.ssh = None self.ssh_pool = [] # SSH连接池
self.ssh_closed = True self.active_commands = 0 # 当前活动命令数
# SSH连接池管理 self.ssh_closed = True # 初始状态为关闭
self.ssh_pool = [] self.use_sudo = False # 初始化use_sudo属性
self.ssh_pool_size = 3 # 连接池大小
self.ssh_pool_lock = asyncio.Lock()
self.ups_manager = UPSManager(self)
self.vm_manager = VMManager(self)
self.use_sudo = False
# 确保data始终有初始值 self.data = {
self.data = self.get_default_data() "disks": [],
"system": {
"uptime": "未知",
"cpu_temperature": "未知",
"motherboard_temperature": "未知",
"status": "off"
},
"ups": {},
"vms": []
}
scan_interval = config.get(CONF_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL) scan_interval = config.get(CONF_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL)
update_interval = timedelta(seconds=scan_interval) update_interval = timedelta(seconds=scan_interval)
@@ -58,368 +64,262 @@ class FlynasCoordinator(DataUpdateCoordinator):
self.disk_manager = DiskManager(self) self.disk_manager = DiskManager(self)
self.system_manager = SystemManager(self) self.system_manager = SystemManager(self)
self._system_online = False self.ups_manager = UPSManager(self)
self._ping_task = None self.vm_manager = VMManager(self)
self._retry_interval = 30 # 系统离线时的检测间隔(秒)
self._last_command_time = 0
self._command_count = 0
# 添加日志方法
self.debug_enabled = _LOGGER.isEnabledFor(logging.DEBUG)
def get_default_data(self):
"""返回默认的数据结构"""
return {
"disks": [],
"system": {
"uptime": "未知",
"cpu_temperature": "未知",
"motherboard_temperature": "未知",
"status": "off"
},
"ups": {},
"vms": [],
"docker_containers": []
}
def _debug_log(self, message: str):
"""只在调试模式下输出详细日志"""
if self.debug_enabled:
_LOGGER.debug(message)
def _info_log(self, message: str):
"""重要信息日志"""
_LOGGER.info(message)
def _warning_log(self, message: str):
"""警告日志"""
_LOGGER.warning(message)
def _error_log(self, message: str):
"""错误日志"""
_LOGGER.error(message)
async def get_ssh_connection(self): async def get_ssh_connection(self):
"""从连接池获取可用的SSH连接""" """从连接池获取或创建SSH连接"""
async with self.ssh_pool_lock: # 避免递归调用,改为循环等待
# 检查现有连接 start_time = time.time()
for i, (ssh, in_use) in enumerate(self.ssh_pool): while True:
if not in_use: # 如果连接池中有可用连接且没有超过最大活动命令数
try: while len(self.ssh_pool) > 0 and self.active_commands < self.max_connections:
# 测试连接是否活跃 conn = self.ssh_pool.pop()
await asyncio.wait_for(ssh.run("echo 'test'", timeout=1), timeout=2) if await self.is_connection_alive(conn):
self.ssh_pool[i] = (ssh, True) # 标记为使用中 self.active_commands += 1
self._debug_log(f"复用连接池中的连接 {i}") return conn
return ssh, i else:
except Exception: await self.close_connection(conn)
# 连接失效,移除
try:
ssh.close()
except:
pass
self.ssh_pool.pop(i)
break
# 如果连接池未满,创建新连接 # 如果没有可用连接,创建新连接
if len(self.ssh_pool) < self.ssh_pool_size: if self.active_commands < self.max_connections:
try: try:
ssh = await asyncssh.connect( conn = await asyncssh.connect(
self.host, self.host,
port=self.port, port=self.port,
username=self.username, username=self.username,
password=self.password, password=self.password,
known_hosts=None, known_hosts=None,
connect_timeout=5 connect_timeout=10
) )
self.active_commands += 1
self.ssh_closed = False
# 检查并设置权限状态 # 确定是否需要sudo权限
await self._setup_connection_permissions(ssh) await self.determine_sudo_setting(conn)
connection_id = len(self.ssh_pool) return conn
self.ssh_pool.append((ssh, True))
self._debug_log(f"创建新的SSH连接 {connection_id}")
return ssh, connection_id
except Exception as e: except Exception as e:
self._debug_log(f"创建SSH连接失败: {e}") _LOGGER.error("创建SSH连接失败: %s", str(e), exc_info=True)
raise raise UpdateFailed(f"SSH连接失败: {str(e)}")
# 连接池满且所有连接都在使用中,等待可用连接 # 等待0.1秒后重试,避免递归
self._debug_log("所有连接都在使用中,等待可用连接...") await asyncio.sleep(0.1)
for _ in range(50): # 最多等待5秒
await asyncio.sleep(0.1)
for i, (ssh, in_use) in enumerate(self.ssh_pool):
if not in_use:
try:
await asyncio.wait_for(ssh.run("echo 'test'", timeout=1), timeout=2)
self.ssh_pool[i] = (ssh, True)
self._debug_log(f"等待后获得连接 {i}")
return ssh, i
except Exception:
try:
ssh.close()
except:
pass
self.ssh_pool.pop(i)
break
raise Exception("无法获取SSH连接") # 设置超时30秒
if time.time() - start_time > 30:
raise UpdateFailed("获取SSH连接超时")
async def _setup_connection_permissions(self, ssh): async def determine_sudo_setting(self, conn):
"""为新连接设置权限状态""" """确定是否需要使用sudo权限"""
try: try:
# 检查是否root用户 # 检查当前用户是否root
result = await ssh.run("id -u", timeout=3) result = await conn.run("id -u", timeout=5)
if result.stdout.strip() == "0": if result.stdout.strip() == "0":
self._debug_log("当前用户是 root") _LOGGER.debug("当前用户是root不需要sudo")
self.use_sudo = False self.use_sudo = False
return return
# 尝试切换到root会话
if self.root_password:
try:
await ssh.run(
f"echo '{self.root_password}' | sudo -S -i",
input=self.root_password + "\n",
timeout=5
)
whoami = await ssh.run("whoami")
if "root" in whoami.stdout:
self._info_log("成功切换到 root 会话(使用 root 密码)")
self.use_sudo = False
return
except Exception:
pass
# 尝试使用登录密码sudo
try:
await ssh.run(
f"echo '{self.password}' | sudo -S -i",
input=self.password + "\n",
timeout=5
)
whoami = await ssh.run("whoami")
if "root" in whoami.stdout:
self._info_log("成功切换到 root 会话(使用登录密码)")
self.use_sudo = False
return
except Exception:
pass
# 设置为使用sudo模式
self.use_sudo = True
self._debug_log("设置为使用sudo模式")
except Exception as e: except Exception as e:
self._debug_log(f"设置连接权限失败: {e}") _LOGGER.warning("检查用户ID失败: %s", str(e))
self.use_sudo = True
# 检查是否可以使用密码sudo
try:
result = await conn.run(
f"echo '{self.password}' | sudo -S whoami",
input=self.password + "\n",
timeout=10
)
if "root" in result.stdout:
_LOGGER.info("可以使用用户密码sudo")
self.use_sudo = True
return
except Exception as e:
_LOGGER.debug("无法使用用户密码sudo: %s", str(e))
# 如果有root密码尝试使用root密码sudo
if self.root_password:
try:
result = await conn.run(
f"echo '{self.root_password}' | sudo -S whoami",
input=self.root_password + "\n",
timeout=10
)
if "root" in result.stdout:
_LOGGER.info("可以使用root密码sudo")
self.use_sudo = True
return
except Exception as e:
_LOGGER.debug("无法使用root密码sudo: %s", str(e))
_LOGGER.warning("无法获取root权限将使用普通用户执行命令")
self.use_sudo = False
async def release_ssh_connection(self, connection_id): async def release_ssh_connection(self, conn):
"""释放SSH连接回连接池""" """释放连接回连接池"""
async with self.ssh_pool_lock: self.active_commands -= 1
if 0 <= connection_id < len(self.ssh_pool): if conn and not conn.is_closed():
ssh, _ = self.ssh_pool[connection_id] if len(self.ssh_pool) < self.max_connections:
self.ssh_pool[connection_id] = (ssh, False) # 标记为可用 self.ssh_pool.append(conn)
self._debug_log(f"释放SSH连接 {connection_id}") else:
await self.close_connection(conn)
async def close_all_ssh_connections(self): else:
"""关闭所有SSH连接""" # 如果连接已经关闭,直接丢弃
async with self.ssh_pool_lock: pass
for ssh, _ in self.ssh_pool:
try: async def close_connection(self, conn):
ssh.close() """关闭SSH连接"""
except: try:
pass if conn and not conn.is_closed():
self.ssh_pool.clear() conn.close()
self._debug_log("已关闭所有SSH连接") except Exception as e:
_LOGGER.debug("关闭SSH连接时出错: %s", str(e))
async def is_connection_alive(self, conn) -> bool:
"""检查连接是否存活"""
try:
# 发送一个简单的命令测试连接
result = await conn.run("echo 'connection_test'", timeout=2)
return result.exit_status == 0 and "connection_test" in result.stdout
except (asyncssh.Error, TimeoutError, ConnectionResetError):
return False
async def run_command(self, command: str, retries=2) -> str:
"""使用连接池执行命令"""
conn = None
current_retries = retries
while current_retries >= 0:
try:
conn = await self.get_ssh_connection()
# 根据sudo设置执行命令
if self.use_sudo:
password = self.root_password if self.root_password else self.password
if password:
full_command = f"sudo -S {command}"
result = await conn.run(full_command, input=password + "\n", check=True)
else:
full_command = f"sudo {command}"
result = await conn.run(full_command, check=True)
else:
result = await conn.run(command, check=True)
return result.stdout.strip()
except asyncssh.process.ProcessError as e:
if e.exit_status in [4, 32]:
return ""
_LOGGER.error("Command failed: %s (exit %d)", command, e.exit_status)
# 连接可能已损坏,关闭它
await self.close_connection(conn)
conn = None
if current_retries > 0:
current_retries -= 1
continue
else:
raise UpdateFailed(f"Command failed: {command}") from e
except asyncssh.Error as e:
_LOGGER.error("SSH连接错误: %s", str(e))
await self.close_connection(conn)
conn = None
if current_retries > 0:
current_retries -= 1
continue
else:
raise UpdateFailed(f"SSH错误: {str(e)}") from e
except Exception as e:
_LOGGER.error("意外错误: %s", str(e), exc_info=True)
await self.close_connection(conn)
conn = None
if current_retries > 0:
current_retries -= 1
continue
else:
raise UpdateFailed(f"意外错误: {str(e)}") from e
finally:
if conn:
await self.release_ssh_connection(conn)
async def async_connect(self): async def async_connect(self):
"""建立并保持持久SSH连接 - 兼容旧代码""" """建立SSH连接使用连接池"""
try: # 连接池已处理连接,此方法现在主要用于初始化
ssh, connection_id = await self.get_ssh_connection() return True
await self.release_ssh_connection(connection_id)
return True async def is_ssh_connected(self) -> bool:
except Exception: """检查是否有活动的SSH连接"""
return False return len(self.ssh_pool) > 0 or self.active_commands > 0
async def async_disconnect(self): async def async_disconnect(self):
"""断开SSH连接 - 兼容旧代码""" """关闭所有SSH连接"""
await self.close_all_ssh_connections() # 关闭连接池中的所有连接
for conn in self.ssh_pool:
async def run_command(self, command: str, retries=2) -> str: await self.close_connection(conn)
"""执行SSH命令使用连接池""" self.ssh_pool = []
# 系统离线时直接返回空字符串 self.active_commands = 0
if not self._system_online: self.ssh_closed = True
return "" self.use_sudo = False # 重置sudo设置
ssh = None
connection_id = None
try:
# 从连接池获取连接
ssh, connection_id = await self.get_ssh_connection()
# 构建完整命令
if self.use_sudo:
if self.root_password or self.password:
password = self.root_password if self.root_password else self.password
full_command = f"sudo -S {command}"
result = await ssh.run(full_command, input=password + "\n", timeout=10)
else:
full_command = f"sudo {command}"
result = await ssh.run(full_command, timeout=10)
else:
result = await ssh.run(command, timeout=10)
return result.stdout.strip()
except Exception as e:
self._debug_log(f"命令执行失败: {command}, 错误: {str(e)}")
return ""
finally:
# 释放连接回连接池
if connection_id is not None:
await self.release_ssh_connection(connection_id)
async def run_command_direct(self, command: str) -> str:
"""直接执行命令,获取独立连接 - 用于并发任务"""
if not self._system_online:
return ""
ssh = None
connection_id = None
try:
ssh, connection_id = await self.get_ssh_connection()
if self.use_sudo:
if self.root_password or self.password:
password = self.root_password if self.root_password else self.password
full_command = f"sudo -S {command}"
result = await ssh.run(full_command, input=password + "\n", timeout=10)
else:
full_command = f"sudo {command}"
result = await ssh.run(full_command, timeout=10)
else:
result = await ssh.run(command, timeout=10)
return result.stdout.strip()
except Exception as e:
self._debug_log(f"直接命令执行失败: {command}, 错误: {str(e)}")
return ""
finally:
if connection_id is not None:
await self.release_ssh_connection(connection_id)
async def _monitor_system_status(self):
"""系统离线时轮询检测状态"""
self._debug_log(f"启动系统状态监控,每{self._retry_interval}秒检测一次")
while True:
await asyncio.sleep(self._retry_interval)
if await self.ping_system():
self._info_log("检测到系统已开机,触发重新加载")
# 触发集成重新加载
self.hass.async_create_task(
self.hass.config_entries.async_reload(self.config_entry.entry_id)
)
break
async def ping_system(self) -> bool:
"""轻量级系统状态检测"""
# 对于本地主机直接返回True
if self.host in ['localhost', '127.0.0.1']:
return True
try:
# 使用异步ping检测
proc = await asyncio.create_subprocess_exec(
'ping', '-c', '1', '-W', '1', self.host,
stdout=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.DEVNULL
)
await proc.wait()
return proc.returncode == 0
except Exception:
return False
async def _async_update_data(self): async def _async_update_data(self):
"""数据更新入口,优化命令执行频率""" _LOGGER.debug("Starting data update...")
self._debug_log("开始数据更新...")
is_online = await self.ping_system()
self._system_online = is_online
if not is_online:
self._debug_log("系统离线,跳过数据更新")
# 启动后台监控任务
if not self._ping_task or self._ping_task.done():
self._ping_task = asyncio.create_task(self._monitor_system_status())
await self.close_all_ssh_connections()
return self.get_default_data()
# 系统在线处理
try: try:
# 预热连接池并确保权限设置正确 if await self.is_ssh_connected():
await self.async_connect() status = "on"
else:
if not await self.async_connect():
status = "off"
else:
status = "on"
# 获取系统状态信息 # 使用已初始化的管理器获取数据
status = "on"
# 串行获取信息以确保稳定性
self._debug_log("开始获取系统信息...")
system = await self.system_manager.get_system_info()
self._debug_log("系统信息获取完成")
self._debug_log("开始获取磁盘信息...")
disks = await self.disk_manager.get_disks_info() disks = await self.disk_manager.get_disks_info()
self._debug_log(f"磁盘信息获取完成,数量: {len(disks)}") system = await self.system_manager.get_system_info()
self._debug_log("开始获取UPS信息...")
ups_info = await self.ups_manager.get_ups_info() ups_info = await self.ups_manager.get_ups_info()
self._debug_log("UPS信息获取完成")
self._debug_log("开始获取虚拟机信息...")
vms = await self.vm_manager.get_vm_list() vms = await self.vm_manager.get_vm_list()
self._debug_log(f"虚拟机信息获取完成,数量: {len(vms)}")
# 为每个虚拟机获取标题 # 获取虚拟机标题
for vm in vms: for vm in vms:
try: vm["title"] = await self.vm_manager.get_vm_title(vm["name"])
vm["title"] = await self.vm_manager.get_vm_title(vm["name"])
except Exception as e: # 获取Docker容器信息如果启用
self._debug_log(f"获取VM标题失败 {vm['name']}: {e}")
vm["title"] = vm["name"]
# 获取Docker容器信息
docker_containers = [] docker_containers = []
if self.enable_docker and self.docker_manager: if self.enable_docker and hasattr(self, 'docker_manager') and self.docker_manager:
self._debug_log("开始获取Docker信息...") docker_containers = await self.docker_manager.get_containers()
try:
docker_containers = await self.docker_manager.get_containers()
self._debug_log(f"Docker信息获取完成数量: {len(docker_containers)}")
except Exception as e:
self._debug_log(f"Docker信息获取失败: {e}")
data = { data = {
"disks": disks, "disks": disks,
"system": {**system, "status": status}, "system": {
**system,
"status": status
},
"ups": ups_info, "ups": ups_info,
"vms": vms, "vms": vms,
"docker_containers": docker_containers "docker_containers": docker_containers
} }
self._debug_log(f"数据更新完成: disks={len(disks)}, vms={len(vms)}, containers={len(docker_containers)}")
return data return data
except Exception as e: except Exception as e:
self._error_log(f"数据更新失败: {str(e)}") _LOGGER.error("Failed to update data: %s", str(e), exc_info=True)
self._system_online = False return {
if not self._ping_task or self._ping_task.done(): "disks": [],
self._ping_task = asyncio.create_task(self._monitor_system_status()) "system": {
"uptime": "未知",
return self.get_default_data() "cpu_temperature": "未知",
"motherboard_temperature": "未知",
"status": "off"
},
"ups": {},
"vms": []
}
async def reboot_system(self):
await self.system_manager.reboot_system()
async def shutdown_system(self):
await self.system_manager.shutdown_system()
if self.data and "system" in self.data:
self.data["system"]["status"] = "off"
self.async_update_listeners()
class UPSDataUpdateCoordinator(DataUpdateCoordinator): class UPSDataUpdateCoordinator(DataUpdateCoordinator):
def __init__(self, hass: HomeAssistant, config, main_coordinator): def __init__(self, hass: HomeAssistant, config, main_coordinator):
@@ -439,20 +339,19 @@ class UPSDataUpdateCoordinator(DataUpdateCoordinator):
self.ups_manager = UPSManager(main_coordinator) self.ups_manager = UPSManager(main_coordinator)
async def _async_update_data(self): async def _async_update_data(self):
# 如果主协调器检测到系统离线跳过UPS更新
if not self.main_coordinator._system_online:
return {}
try: try:
return await self.ups_manager.get_ups_info() return await self.ups_manager.get_ups_info()
except Exception as e: except Exception as e:
_LOGGER.debug("UPS数据更新失败: %s", str(e)) _LOGGER.error("Failed to update UPS data: %s", str(e), exc_info=True)
return {} return {}
async def control_vm(self, vm_name, action): async def control_vm(self, vm_name, action):
try: try:
result = await self.main_coordinator.vm_manager.control_vm(vm_name, action) if not hasattr(self, 'vm_manager'):
self.vm_manager = VMManager(self)
result = await self.vm_manager.control_vm(vm_name, action)
return result return result
except Exception as e: except Exception as e:
_LOGGER.debug("虚拟机控制失败: %s", str(e)) _LOGGER.error("虚拟机控制失败: %s", str(e), exc_info=True)
return False return False

View File

@@ -1,7 +1,8 @@
import re import re
import logging import logging
import asyncio import asyncio
from .const import CONF_IGNORE_DISKS import time
from .const import CONF_IGNORE_DISKS, CONF_CACHE_TIMEOUT, DEFAULT_CACHE_TIMEOUT
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@@ -14,7 +15,10 @@ class DiskManager:
self.disk_full_info_cache = {} # 缓存磁盘完整信息 self.disk_full_info_cache = {} # 缓存磁盘完整信息
self.first_run = True # 首次运行标志 self.first_run = True # 首次运行标志
self.initial_detection_done = False # 首次完整检测完成标志 self.initial_detection_done = False # 首次完整检测完成标志
self.cache_expiry = {} # 缓存过期时间(时间戳)
# 获取缓存超时配置(分钟),转换为秒
self.cache_timeout = self.coordinator.config.get(CONF_CACHE_TIMEOUT, DEFAULT_CACHE_TIMEOUT) * 60
def extract_value(self, text: str, patterns, default="未知", format_func=None): def extract_value(self, text: str, patterns, default="未知", format_func=None):
if not text: if not text:
return default return default
@@ -38,7 +42,6 @@ class DiskManager:
async def check_disk_active(self, device: str, window: int = 30) -> bool: async def check_disk_active(self, device: str, window: int = 30) -> bool:
"""检查硬盘在指定时间窗口内是否有活动""" """检查硬盘在指定时间窗口内是否有活动"""
try: try:
# 正确的路径是 /sys/block/{device}/stat
stat_path = f"/sys/block/{device}/stat" stat_path = f"/sys/block/{device}/stat"
# 读取统计文件 # 读取统计文件
@@ -91,8 +94,6 @@ class DiskManager:
stats = stat_output.split() stats = stat_output.split()
if len(stats) >= 11: if len(stats) >= 11:
# 第9个字段是最近完成的读操作数
# 第10个字段是最近完成的写操作数
recent_reads = int(stats[8]) recent_reads = int(stats[8])
recent_writes = int(stats[9]) recent_writes = int(stats[9])
@@ -108,6 +109,14 @@ class DiskManager:
async def get_disks_info(self) -> list[dict]: async def get_disks_info(self) -> list[dict]:
disks = [] disks = []
try: try:
# 清理过期缓存
now = time.time()
for device in list(self.disk_full_info_cache.keys()):
if now - self.cache_expiry.get(device, 0) > self.cache_timeout:
self.logger.debug(f"磁盘 {device} 的缓存已过期,清除")
del self.disk_full_info_cache[device]
del self.cache_expiry[device]
self.logger.debug("Fetching disk list...") self.logger.debug("Fetching disk list...")
lsblk_output = await self.coordinator.run_command("lsblk -dno NAME,TYPE") lsblk_output = await self.coordinator.run_command("lsblk -dno NAME,TYPE")
self.logger.debug("lsblk output: %s", lsblk_output) self.logger.debug("lsblk output: %s", lsblk_output)
@@ -148,14 +157,15 @@ class DiskManager:
# 检查是否有缓存的完整信息 # 检查是否有缓存的完整信息
cached_info = self.disk_full_info_cache.get(device, {}) cached_info = self.disk_full_info_cache.get(device, {})
# 优化点:首次运行时强制获取完整信息 # 首次运行时强制获取完整信息
if self.first_run: if self.first_run:
self.logger.debug(f"首次运行,强制获取硬盘 {device} 的完整信息") self.logger.debug(f"首次运行,强制获取硬盘 {device} 的完整信息")
try: try:
# 执行完整的信息获取 # 执行完整的信息获取
await self._get_full_disk_info(disk_info, device_path) await self._get_full_disk_info(disk_info, device_path)
# 更新缓存 # 更新缓存并设置过期时间
self.disk_full_info_cache[device] = disk_info.copy() self.disk_full_info_cache[device] = disk_info.copy()
self.cache_expiry[device] = now
except Exception as e: except Exception as e:
self.logger.warning(f"首次运行获取硬盘信息失败: {str(e)}", exc_info=True) self.logger.warning(f"首次运行获取硬盘信息失败: {str(e)}", exc_info=True)
# 使用缓存信息(如果有) # 使用缓存信息(如果有)
@@ -206,8 +216,9 @@ class DiskManager:
try: try:
# 执行完整的信息获取 # 执行完整的信息获取
await self._get_full_disk_info(disk_info, device_path) await self._get_full_disk_info(disk_info, device_path)
# 更新缓存 # 更新缓存并设置过期时间
self.disk_full_info_cache[device] = disk_info.copy() self.disk_full_info_cache[device] = disk_info.copy()
self.cache_expiry[device] = now
except Exception as e: except Exception as e:
self.logger.warning(f"获取硬盘信息失败: {str(e)}", exc_info=True) self.logger.warning(f"获取硬盘信息失败: {str(e)}", exc_info=True)
# 使用缓存信息(如果有) # 使用缓存信息(如果有)

View File

@@ -1,7 +1,7 @@
{ {
"domain": "fn_nas", "domain": "fn_nas",
"name": "飞牛NAS", "name": "飞牛NAS",
"version": "1.3.6", "version": "1.3.3",
"documentation": "https://github.com/anxms/fn_nas", "documentation": "https://github.com/anxms/fn_nas",
"dependencies": [], "dependencies": [],
"codeowners": ["@anxms"], "codeowners": ["@anxms"],

View File

@@ -1,5 +1,7 @@
import re
import logging import logging
import asyncio import asyncio
import json
import os import os
from datetime import datetime from datetime import datetime
@@ -9,41 +11,10 @@ class SystemManager:
def __init__(self, coordinator): def __init__(self, coordinator):
self.coordinator = coordinator self.coordinator = coordinator
self.logger = _LOGGER.getChild("system_manager") self.logger = _LOGGER.getChild("system_manager")
# 根据Home Assistant的日志级别动态设置 self.logger.setLevel(logging.DEBUG)
self.logger.setLevel(logging.DEBUG if _LOGGER.isEnabledFor(logging.DEBUG) else logging.INFO) self.debug_enabled = False # 调试模式开关
self.debug_enabled = _LOGGER.isEnabledFor(logging.DEBUG) # 基于HA调试模式 self.sensors_debug_path = "/config/fn_nas_debug" # 调试文件保存路径
self.sensors_debug_path = "/config/fn_nas_debug"
# 温度传感器缓存
self.cpu_temp_cache = {
"hwmon_id": None,
"temp_id": None,
"driver_type": None,
"label": None
}
self.mobo_temp_cache = {
"hwmon_id": None,
"temp_id": None,
"label": None
}
def _debug_log(self, message: str):
"""只在调试模式下输出详细日志"""
if self.debug_enabled:
self.logger.debug(message)
def _info_log(self, message: str):
"""重要信息日志"""
self.logger.info(message)
def _warning_log(self, message: str):
"""警告日志"""
self.logger.warning(message)
def _error_log(self, message: str):
"""错误日志"""
self.logger.error(message)
async def get_system_info(self) -> dict: async def get_system_info(self) -> dict:
"""获取系统信息""" """获取系统信息"""
system_info = {} system_info = {}
@@ -52,8 +23,10 @@ class SystemManager:
uptime_output = await self.coordinator.run_command("cat /proc/uptime") uptime_output = await self.coordinator.run_command("cat /proc/uptime")
if uptime_output: if uptime_output:
try: try:
# 保存原始秒数
uptime_seconds = float(uptime_output.split()[0]) uptime_seconds = float(uptime_output.split()[0])
system_info["uptime_seconds"] = uptime_seconds system_info["uptime_seconds"] = uptime_seconds
# 保存格式化字符串
system_info["uptime"] = self.format_uptime(uptime_seconds) system_info["uptime"] = self.format_uptime(uptime_seconds)
except (ValueError, IndexError): except (ValueError, IndexError):
system_info["uptime_seconds"] = 0 system_info["uptime_seconds"] = 0
@@ -61,20 +34,43 @@ class SystemManager:
else: else:
system_info["uptime_seconds"] = 0 system_info["uptime_seconds"] = 0
system_info["uptime"] = "未知" system_info["uptime"] = "未知"
# 一次性获取CPU和主板温度 # 获取 sensors 命令输出使用JSON格式
temps = await self.get_temperatures_from_sensors() sensors_output = await self.coordinator.run_command(
system_info["cpu_temperature"] = temps["cpu"] "sensors -j 2>/dev/null || sensors 2>/dev/null || echo 'No sensor data'"
system_info["motherboard_temperature"] = temps["motherboard"] )
# 保存传感器数据以便调试
self.save_sensor_data_for_debug(sensors_output)
self.logger.debug("Sensors output: %s", sensors_output[:500] + "..." if len(sensors_output) > 500 else sensors_output)
# 提取 CPU 温度(改进算法)
cpu_temp = self.extract_cpu_temp(sensors_output)
system_info["cpu_temperature"] = cpu_temp
# 提取主板温度(改进算法)
mobo_temp = self.extract_mobo_temp(sensors_output)
system_info["motherboard_temperature"] = mobo_temp
# 尝试备用方法获取CPU温度
if cpu_temp == "未知":
backup_cpu_temp = await self.get_cpu_temp_fallback()
if backup_cpu_temp:
system_info["cpu_temperature"] = backup_cpu_temp
# 新增:获取内存信息
mem_info = await self.get_memory_info() mem_info = await self.get_memory_info()
system_info.update(mem_info) system_info.update(mem_info)
# 新增:获取存储卷信息
vol_info = await self.get_vol_usage() vol_info = await self.get_vol_usage()
system_info["volumes"] = vol_info system_info["volumes"] = vol_info
return system_info return system_info
except Exception as e: except Exception as e:
self.logger.error("Error getting system info: %s", str(e)) self.logger.error("Error getting system info: %s", str(e))
# 在异常处理中返回空数据
return { return {
"uptime_seconds": 0, "uptime_seconds": 0,
"uptime": "未知", "uptime": "未知",
@@ -85,165 +81,71 @@ class SystemManager:
"memory_available": "未知", "memory_available": "未知",
"volumes": {} "volumes": {}
} }
async def get_temperatures_from_sensors(self) -> dict: def save_sensor_data_for_debug(self, sensors_output: str):
"""一次性获取CPU和主板温度""" """保存传感器数据以便调试"""
if not self.debug_enabled:
return
try: try:
command = "sensors" # 创建调试目录
self._debug_log(f"执行sensors命令获取温度: {command}") if not os.path.exists(self.sensors_debug_path):
os.makedirs(self.sensors_debug_path)
sensors_output = await self.coordinator.run_command(command) # 生成文件名
if self.debug_enabled: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
self._debug_log(f"sensors命令输出长度: {len(sensors_output) if sensors_output else 0}") filename = os.path.join(self.sensors_debug_path, f"sensors_{timestamp}.log")
if not sensors_output: # 写入文件
self._warning_log("sensors命令无输出") with open(filename, "w") as f:
return {"cpu": "未知", "motherboard": "未知"} f.write(sensors_output)
# 同时解析CPU和主板温度
cpu_temp = self.extract_cpu_temp_from_sensors(sensors_output)
mobo_temp = self.extract_mobo_temp_from_sensors(sensors_output)
# 记录获取结果
if cpu_temp != "未知":
self._info_log(f"通过sensors获取CPU温度成功: {cpu_temp}")
else:
self._warning_log("sensors命令未找到CPU温度")
if mobo_temp != "未知":
self._info_log(f"通过sensors获取主板温度成功: {mobo_temp}")
else:
self._warning_log("sensors命令未找到主板温度")
return {"cpu": cpu_temp, "motherboard": mobo_temp}
self.logger.info("Saved sensors output to %s for debugging", filename)
except Exception as e: except Exception as e:
self._error_log(f"使用sensors命令获取温度失败: {e}") self.logger.error("Failed to save sensor data: %s", str(e))
return {"cpu": "未知", "motherboard": "未知"}
async def get_cpu_temp_fallback(self) -> str:
async def get_cpu_temp_from_kernel(self) -> str: """备用方法获取CPU温度"""
"""获取CPU温度 - 向后兼容""" self.logger.info("Trying fallback methods to get CPU temperature")
temps = await self.get_temperatures_from_sensors()
return temps["cpu"] # 方法1: 从/sys/class/thermal读取
async def get_mobo_temp_from_kernel(self) -> str:
"""获取主板温度 - 向后兼容"""
temps = await self.get_temperatures_from_sensors()
return temps["motherboard"]
async def get_cpu_temp_from_sensors(self) -> str:
"""使用sensors命令获取CPU温度 - 向后兼容"""
temps = await self.get_temperatures_from_sensors()
return temps["cpu"]
async def get_mobo_temp_from_sensors(self) -> str:
"""使用sensors命令获取主板温度 - 向后兼容"""
temps = await self.get_temperatures_from_sensors()
return temps["motherboard"]
def extract_cpu_temp_from_sensors(self, sensors_output: str) -> str:
"""从sensors输出中提取CPU温度"""
try: try:
lines = sensors_output.split('\n') for i in range(5): # 检查前5个可能的传感器
self._debug_log(f"解析sensors输出{len(lines)}") path = f"/sys/class/thermal/thermal_zone{i}/temp"
output = await self.coordinator.run_command(f"cat {path} 2>/dev/null")
for i, line in enumerate(lines): if output and output.isdigit():
line_lower = line.lower().strip() temp = float(output) / 1000.0
if self.debug_enabled: self.logger.info("Found CPU temperature via thermal zone: %.1f°C", temp)
self._debug_log(f"{i+1}行: {line_lower}") return f"{temp:.1f} °C"
except Exception:
# AMD CPU温度关键词 pass
if any(keyword in line_lower for keyword in [
"tctl", "tdie", "k10temp" # 方法2: 从hwmon设备读取
]):
self._debug_log(f"找到AMD CPU温度行: {line}")
if '+' in line and '°c' in line_lower:
try:
temp_match = line.split('+')[1].split('°')[0].strip()
temp = float(temp_match)
if 0 < temp < 150:
self._info_log(f"从sensors提取AMD CPU温度: {temp:.1f}°C")
return f"{temp:.1f} °C"
except (ValueError, IndexError) as e:
self._debug_log(f"解析AMD温度失败: {e}")
continue
# Intel CPU温度关键词
if any(keyword in line_lower for keyword in [
"package id", "core 0", "coretemp"
]) and not any(exclude in line_lower for exclude in ["fan"]):
self._debug_log(f"找到Intel CPU温度行: {line}")
if '+' in line and '°c' in line_lower:
try:
temp_match = line.split('+')[1].split('°')[0].strip()
temp = float(temp_match)
if 0 < temp < 150:
self._info_log(f"从sensors提取Intel CPU温度: {temp:.1f}°C")
return f"{temp:.1f} °C"
except (ValueError, IndexError) as e:
self._debug_log(f"解析Intel温度失败: {e}")
continue
# 通用CPU温度模式
if ('cpu' in line_lower or 'processor' in line_lower) and '+' in line and '°c' in line_lower:
self._debug_log(f"找到通用CPU温度行: {line}")
try:
temp_match = line.split('+')[1].split('°')[0].strip()
temp = float(temp_match)
if 0 < temp < 150:
self._info_log(f"从sensors提取通用CPU温度: {temp:.1f}°C")
return f"{temp:.1f} °C"
except (ValueError, IndexError) as e:
self._debug_log(f"解析通用CPU温度失败: {e}")
continue
self._warning_log("未在sensors输出中找到CPU温度")
return "未知"
except Exception as e:
self._error_log(f"解析sensors CPU温度输出失败: {e}")
return "未知"
def extract_mobo_temp_from_sensors(self, sensors_output: str) -> str:
"""从sensors输出中提取主板温度"""
try: try:
lines = sensors_output.split('\n') for i in range(5): # 检查前5个可能的hwmon设备
self._debug_log(f"解析主板温度,共{len(lines)}") for j in range(5): # 检查每个设备的前5个温度传感器
path = f"/sys/class/hwmon/hwmon{i}/temp{j}_input"
for i, line in enumerate(lines): output = await self.coordinator.run_command(f"cat {path} 2>/dev/null")
line_lower = line.lower().strip() if output and output.isdigit():
temp = float(output) / 1000.0
# 主板温度关键词 self.logger.info("Found CPU temperature via hwmon: %.1f°C", temp)
if any(keyword in line_lower for keyword in [ return f"{temp:.1f} °C"
"motherboard", "mobo", "mb", "system", "chipset", except Exception:
"ambient", "temp1:", "temp2:", "temp3:", "systin" pass
]) and not any(cpu_keyword in line_lower for cpu_keyword in [
"cpu", "core", "package", "processor", "tctl", "tdie" # 方法3: 使用psutil库如果可用
]) and not any(exclude in line_lower for exclude in ["fan", "rpm"]): try:
output = await self.coordinator.run_command("python3 -c 'import psutil; print(psutil.sensors_temperatures().get(\"coretemp\")[0].current)' 2>/dev/null")
self._debug_log(f"找到可能的主板温度行: {line}") if output and output.replace('.', '', 1).isdigit():
temp = float(output)
if '+' in line and '°c' in line_lower: self.logger.info("Found CPU temperature via psutil: %.1f°C", temp)
try: return f"{temp:.1f} °C"
temp_match = line.split('+')[1].split('°')[0].strip() except Exception:
temp = float(temp_match) pass
# 主板温度通常在15-70度之间
if 15 <= temp <= 70: self.logger.warning("All fallback methods failed to get CPU temperature")
self._info_log(f"从sensors提取主板温度: {temp:.1f}°C") return ""
return f"{temp:.1f} °C"
else:
self._debug_log(f"主板温度值超出合理范围: {temp:.1f}°C")
except (ValueError, IndexError) as e:
self._debug_log(f"解析主板温度失败: {e}")
continue
self._warning_log("未在sensors输出中找到主板温度")
return "未知"
except Exception as e:
self._error_log(f"解析sensors主板温度输出失败: {e}")
return "未知"
def format_uptime(self, seconds: float) -> str: def format_uptime(self, seconds: float) -> str:
"""格式化运行时间为易读格式""" """格式化运行时间为易读格式"""
try: try:
@@ -264,6 +166,223 @@ class SystemManager:
self.logger.error("Failed to format uptime: %s", str(e)) self.logger.error("Failed to format uptime: %s", str(e))
return "未知" return "未知"
def extract_cpu_temp(self, sensors_output: str) -> str:
"""从 sensors 输出中提取 CPU 温度,优先获取 Package id 0"""
# 优先尝试获取 Package id 0 温度值
package_id_pattern = r'Package id 0:\s*\+?(\d+\.?\d*)°C'
package_match = re.search(package_id_pattern, sensors_output, re.IGNORECASE)
if package_match:
try:
package_temp = float(package_match.group(1))
self.logger.debug("优先使用 Package id 0 温度: %.1f°C", package_temp)
return f"{package_temp:.1f} °C"
except (ValueError, IndexError) as e:
self.logger.debug("Package id 0 解析错误: %s", str(e))
# 其次尝试解析JSON格式
if sensors_output.strip().startswith('{'):
try:
data = json.loads(sensors_output)
self.logger.debug("JSON sensors data: %s", json.dumps(data, indent=2))
# 查找包含Package相关键名的温度值
for key, values in data.items():
if any(kw in key.lower() for kw in ["package", "pkg", "physical"]):
for subkey, temp_value in values.items():
if any(kw in subkey.lower() for kw in ["temp", "input"]) and not "crit" in subkey.lower():
try:
if isinstance(temp_value, (int, float)):
self.logger.debug("JSON中找到Package温度: %s/%s = %.1f°C", key, subkey, temp_value)
return f"{temp_value:.1f} °C"
except Exception as e:
self.logger.debug("JSON值错误: %s", str(e))
# 新增尝试直接获取Tdie/Tctl温度AMD CPU
for key, values in data.items():
if "k10temp" in key.lower():
for subkey, temp_value in values.items():
if "tdie" in subkey.lower() or "tctl" in subkey.lower():
try:
if isinstance(temp_value, (int, float)):
self.logger.debug("JSON中找到Tdie/Tctl温度: %s/%s = %.1f°C", key, subkey, temp_value)
return f"{temp_value:.1f} °C"
except:
pass
except Exception as e:
self.logger.warning("JSON解析失败: %s", str(e))
# 最后尝试其他模式
other_patterns = [
r'Package id 0:\s*\+?(\d+\.?\d*)°C', # 再次尝试确保捕获
r'CPU Temperature:\s*\+?(\d+\.?\d*)°C',
r'cpu_thermal:\s*\+?(\d+\.?\d*)°C',
r'Tdie:\s*\+?(\d+\.?\d*)°C', # AMD CPU
r'Tctl:\s*\+?(\d+\.?\d*)°C', # AMD CPU
r'PECI Agent \d:\s*\+?(\d+\.?\d*)°C',
r'Composite:\s*\+?(\d+\.?\d*)°C',
r'CPU\s+Temp:\s*\+?(\d+\.?\d*)°C',
r'k10temp-pci\S*:\s*\+?(\d+\.?\d*)°C',
r'Physical id 0:\s*\+?(\d+\.?\d*)°C'
]
for pattern in other_patterns:
match = re.search(pattern, sensors_output, re.IGNORECASE)
if match:
try:
temp = float(match.group(1))
self.logger.debug("匹配到CPU温度: %s: %.1f°C", pattern, temp)
return f"{temp:.1f} °C"
except (ValueError, IndexError):
continue
# 如果所有方法都失败返回未知
return "未知"
def extract_temp_from_systin(self, systin_data: dict) -> float:
"""从 SYSTIN 数据结构中提取温度值"""
if not systin_data:
return None
# 尝试从不同键名获取温度值
for key in ["temp1_input", "input", "value"]:
temp = systin_data.get(key)
if temp is not None:
try:
return float(temp)
except (TypeError, ValueError):
continue
return None
def extract_mobo_temp(self, sensors_output: str) -> str:
"""从 sensors 输出中提取主板温度"""
# 首先尝试解析JSON格式
if sensors_output.strip().startswith('{'):
try:
data = json.loads(sensors_output)
# 查找包含主板相关键名的温度值
candidates = []
for key, values in data.items():
# 优先检查 SYSTIN 键
if "systin" in key.lower():
temp = self.extract_temp_from_systin(values)
if temp is not None:
return f"{temp:.1f} °C"
if any(kw in key.lower() for kw in ["system", "motherboard", "mb", "board", "pch", "chipset", "sys", "baseboard", "systin"]):
for subkey, temp_value in values.items():
if any(kw in subkey.lower() for kw in ["temp", "input"]) and not "crit" in subkey.lower():
try:
if isinstance(temp_value, (int, float)):
candidates.append(temp_value)
self.logger.debug("Found mobo temp candidate in JSON: %s/%s = %.1f°C", key, subkey, temp_value)
except Exception:
pass
# 如果有候选值,取平均值
if candidates:
avg_temp = sum(candidates) / len(candidates)
return f"{avg_temp:.1f} °C"
# 新增:尝试直接获取 SYSTIN 的温度值
systin_temp = self.extract_temp_from_systin(data.get("nct6798-isa-02a0", {}).get("SYSTIN", {}))
if systin_temp is not None:
return f"{systin_temp:.1f} °C"
except Exception as e:
self.logger.warning("Failed to parse sensors JSON: %s", str(e))
# 改进SYSTIN提取逻辑
systin_patterns = [
r'SYSTIN:\s*[+\-]?\s*(\d+\.?\d*)\s*°C', # 标准格式
r'SYSTIN[:\s]+[+\-]?\s*(\d+\.?\d*)\s*°C', # 兼容无冒号或多余空格
r'System Temp:\s*[+\-]?\s*(\d+\.?\d*)\s*°C' # 备选方案
]
for pattern in systin_patterns:
systin_match = re.search(pattern, sensors_output, re.IGNORECASE)
if systin_match:
try:
temp = float(systin_match.group(1))
self.logger.debug("Found SYSTIN temperature: %.1f°C", temp)
return f"{temp:.1f} °C"
except (ValueError, IndexError) as e:
self.logger.debug("SYSTIN match error: %s", str(e))
continue
for line in sensors_output.splitlines():
if 'SYSTIN' in line or 'System Temp' in line:
# 改进的温度值提取正则
match = re.search(r'[+\-]?\s*(\d+\.?\d*)\s*°C', line)
if match:
try:
temp = float(match.group(1))
self.logger.debug("Found mobo temp in line: %s: %.1f°C", line.strip(), temp)
return f"{temp:.1f} °C"
except ValueError:
continue
# 如果找不到SYSTIN尝试其他主板温度模式
other_patterns = [
r'System Temp:\s*\+?(\d+\.?\d*)°C',
r'MB Temperature:\s*\+?(\d+\.?\d*)°C',
r'Motherboard:\s*\+?(\d+\.?\d*)°C',
r'SYS Temp:\s*\+?(\d+\.?\d*)°C',
r'Board Temp:\s*\+?(\d+\.?\d*)°C',
r'PCH_Temp:\s*\+?(\d+\.?\d*)°C',
r'Chipset:\s*\+?(\d+\.?\d*)°C',
r'Baseboard Temp:\s*\+?(\d+\.?\d*)°C',
r'System Temperature:\s*\+?(\d+\.?\d*)°C',
r'Mainboard Temp:\s*\+?(\d+\.?\d*)°C'
]
temp_values = []
for pattern in other_patterns:
matches = re.finditer(pattern, sensors_output, re.IGNORECASE)
for match in matches:
try:
temp = float(match.group(1))
temp_values.append(temp)
self.logger.debug("Found motherboard temperature with pattern: %s: %.1f°C", pattern, temp)
except (ValueError, IndexError):
continue
# 如果有找到温度值,取平均值
if temp_values:
avg_temp = sum(temp_values) / len(temp_values)
return f"{avg_temp:.1f} °C"
# 最后,尝试手动扫描所有温度值
fallback_candidates = []
for line in sensors_output.splitlines():
if '°C' in line:
# 跳过CPU相关的行
if any(kw in line.lower() for kw in ["core", "cpu", "package", "tccd", "k10temp", "processor", "amd", "intel", "nvme"]):
continue
# 跳过风扇和电压行
if any(kw in line.lower() for kw in ["fan", "volt", "vin", "+3.3", "+5", "+12", "vdd", "power", "crit", "max", "min"]):
continue
# 查找温度值
match = re.search(r'(\d+\.?\d*)\s*°C', line)
if match:
try:
temp = float(match.group(1))
# 合理温度范围检查 (0-80°C)
if 0 < temp < 80:
fallback_candidates.append(temp)
self.logger.debug("Fallback mobo candidate: %s -> %.1f°C", line.strip(), temp)
except ValueError:
continue
# 如果有候选值,取平均值
if fallback_candidates:
avg_temp = sum(fallback_candidates) / len(fallback_candidates)
self.logger.warning("Using fallback motherboard temperature detection")
return f"{avg_temp:.1f} °C"
return "未知"
async def get_memory_info(self) -> dict: async def get_memory_info(self) -> dict:
"""获取内存使用信息""" """获取内存使用信息"""
try: try:
@@ -289,7 +408,7 @@ class SystemManager:
} }
except Exception as e: except Exception as e:
self._error_log(f"获取内存信息失败: {str(e)}") self.logger.error("获取内存信息失败: %s", str(e))
return {} return {}
async def get_vol_usage(self) -> dict: async def get_vol_usage(self) -> dict:
@@ -379,28 +498,28 @@ class SystemManager:
async def reboot_system(self): async def reboot_system(self):
"""重启系统""" """重启系统"""
self._info_log("Initiating system reboot...") self.logger.info("Initiating system reboot...")
try: try:
await self.coordinator.run_command("sudo reboot") await self.coordinator.run_command("sudo reboot")
self._info_log("Reboot command sent") self.logger.info("Reboot command sent")
if "system" in self.coordinator.data: if "system" in self.coordinator.data:
self.coordinator.data["system"]["status"] = "rebooting" self.coordinator.data["system"]["status"] = "rebooting"
self.coordinator.async_update_listeners() self.coordinator.async_update_listeners()
except Exception as e: except Exception as e:
self._error_log(f"Failed to reboot system: {str(e)}") self.logger.error("Failed to reboot system: %s", str(e))
raise raise
async def shutdown_system(self): async def shutdown_system(self):
"""关闭系统""" """关闭系统"""
self._info_log("Initiating system shutdown...") self.logger.info("Initiating system shutdown...")
try: try:
await self.coordinator.run_command("sudo shutdown -h now") await self.coordinator.run_command("sudo shutdown -h now")
self._info_log("Shutdown command sent") self.logger.info("Shutdown command sent")
if "system" in self.coordinator.data: if "system" in self.coordinator.data:
self.coordinator.data["system"]["status"] = "off" self.coordinator.data["system"]["status"] = "off"
self.coordinator.async_update_listeners() self.coordinator.async_update_listeners()
except Exception as e: except Exception as e:
self._error_log(f"Failed to shutdown system: {str(e)}") self.logger.error("Failed to shutdown system: %s", str(e))
raise raise

View File

@@ -10,7 +10,9 @@
"username": "用户名", "username": "用户名",
"password": "密码", "password": "密码",
"scan_interval": "数据更新间隔(秒)", "scan_interval": "数据更新间隔(秒)",
"enable_docker": "启用docker控制" "enable_docker": "启用docker控制",
"max_connections": "最大连接数",
"cache_timeout": "缓存过期时间"
} }
}, },
"select_mac": { "select_mac": {

View File

@@ -11,27 +11,9 @@ class UPSManager:
def __init__(self, coordinator): def __init__(self, coordinator):
self.coordinator = coordinator self.coordinator = coordinator
self.logger = _LOGGER.getChild("ups_manager") self.logger = _LOGGER.getChild("ups_manager")
# 根据Home Assistant的日志级别动态设置 self.logger.setLevel(logging.DEBUG)
self.logger.setLevel(logging.DEBUG if _LOGGER.isEnabledFor(logging.DEBUG) else logging.INFO) self.debug_enabled = False # UPS调试模式开关
self.debug_enabled = _LOGGER.isEnabledFor(logging.DEBUG) # 基于HA调试模式 self.ups_debug_path = "/config/fn_nas_ups_debug" # UPS调试文件保存路径
self.ups_debug_path = "/config/fn_nas_ups_debug"
def _debug_log(self, message: str):
"""只在调试模式下输出详细日志"""
if self.debug_enabled:
self.logger.debug(message)
def _info_log(self, message: str):
"""重要信息日志"""
self.logger.info(message)
def _warning_log(self, message: str):
"""警告日志"""
self.logger.warning(message)
def _error_log(self, message: str):
"""错误日志"""
self.logger.error(message)
async def get_ups_info(self) -> dict: async def get_ups_info(self) -> dict:
"""获取连接的UPS信息""" """获取连接的UPS信息"""
@@ -49,7 +31,7 @@ class UPSManager:
try: try:
# 尝试使用NUT工具获取UPS信息 # 尝试使用NUT工具获取UPS信息
self._debug_log("尝试使用NUT工具获取UPS信息") self.logger.debug("尝试使用NUT工具获取UPS信息")
output = await self.coordinator.run_command("upsc -l") output = await self.coordinator.run_command("upsc -l")
if output and "No such file" not in output: if output and "No such file" not in output:
@@ -57,11 +39,11 @@ class UPSManager:
ups_names = output.splitlines() ups_names = output.splitlines()
if ups_names: if ups_names:
ups_name = ups_names[0].strip() ups_name = ups_names[0].strip()
self._debug_log(f"发现UPS: {ups_name}") self.logger.debug("发现UPS: %s", ups_name)
# 获取详细的UPS信息 # 获取详细的UPS信息
ups_details = await self.coordinator.run_command(f"upsc {ups_name}") ups_details = await self.coordinator.run_command(f"upsc {ups_name}")
self._debug_log(f"UPS详细信息: {ups_details}") self.logger.debug("UPS详细信息: %s", ups_details)
# 保存UPS数据以便调试 # 保存UPS数据以便调试
self.save_ups_data_for_debug(ups_details) self.save_ups_data_for_debug(ups_details)
@@ -69,20 +51,20 @@ class UPSManager:
# 解析UPS信息 # 解析UPS信息
return self.parse_nut_ups_info(ups_details) return self.parse_nut_ups_info(ups_details)
else: else:
self._debug_log("未找到连接的UPS") self.logger.debug("未找到连接的UPS")
else: else:
self._debug_log("未安装NUT工具尝试备用方法") self.logger.debug("未安装NUT工具尝试备用方法")
# 备用方法尝试直接读取UPS状态 # 备用方法尝试直接读取UPS状态
return await self.get_ups_info_fallback() return await self.get_ups_info_fallback()
except Exception as e: except Exception as e:
self._error_log(f"获取UPS信息时出错: {str(e)}") self.logger.error("获取UPS信息时出错: %s", str(e), exc_info=True)
return ups_info return ups_info
async def get_ups_info_fallback(self) -> dict: async def get_ups_info_fallback(self) -> dict:
"""备用方法获取UPS信息""" """备用方法获取UPS信息"""
self._info_log("尝试备用方法获取UPS信息") self.logger.info("尝试备用方法获取UPS信息")
ups_info = { ups_info = {
"status": "未知", "status": "未知",
"battery_level": "未知", "battery_level": "未知",
@@ -99,7 +81,7 @@ class UPSManager:
# 方法1: 检查USB连接的UPS # 方法1: 检查USB连接的UPS
usb_ups_output = await self.coordinator.run_command("lsusb | grep -i ups || echo 'No USB UPS'") usb_ups_output = await self.coordinator.run_command("lsusb | grep -i ups || echo 'No USB UPS'")
if usb_ups_output and "No USB UPS" not in usb_ups_output: if usb_ups_output and "No USB UPS" not in usb_ups_output:
self._debug_log(f"检测到USB UPS设备: {usb_ups_output}") self.logger.debug("检测到USB UPS设备: %s", usb_ups_output)
ups_info["ups_type"] = "USB" ups_info["ups_type"] = "USB"
# 尝试从输出中提取型号 # 尝试从输出中提取型号
@@ -129,7 +111,7 @@ class UPSManager:
return ups_info return ups_info
except Exception as e: except Exception as e:
self._error_log(f"备用方法获取UPS信息失败: {str(e)}") self.logger.error("备用方法获取UPS信息失败: %s", str(e))
return ups_info return ups_info
def parse_nut_ups_info(self, ups_output: str) -> dict: def parse_nut_ups_info(self, ups_output: str) -> dict:
@@ -271,6 +253,6 @@ class UPSManager:
with open(filename, "w") as f: with open(filename, "w") as f:
f.write(ups_output) f.write(ups_output)
self._info_log(f"保存UPS数据到 {filename} 用于调试") self.logger.info("保存UPS数据到 %s 用于调试", filename)
except Exception as e: except Exception as e:
self._error_log(f"保存UPS数据失败: {str(e)}") self.logger.error("保存UPS数据失败: %s", str(e))

View File

@@ -8,40 +8,15 @@ class VMManager:
def __init__(self, coordinator): def __init__(self, coordinator):
self.coordinator = coordinator self.coordinator = coordinator
self.vms = [] self.vms = []
self.logger = _LOGGER.getChild("vm_manager")
# 根据Home Assistant的日志级别动态设置
self.logger.setLevel(logging.DEBUG if _LOGGER.isEnabledFor(logging.DEBUG) else logging.INFO)
self.debug_enabled = _LOGGER.isEnabledFor(logging.DEBUG)
def _debug_log(self, message: str):
"""只在调试模式下输出详细日志"""
if self.debug_enabled:
self.logger.debug(message)
def _info_log(self, message: str):
"""重要信息日志"""
self.logger.info(message)
def _warning_log(self, message: str):
"""警告日志"""
self.logger.warning(message)
def _error_log(self, message: str):
"""错误日志"""
self.logger.error(message)
async def get_vm_list(self): async def get_vm_list(self):
"""获取虚拟机列表及其状态""" """获取虚拟机列表及其状态"""
try: try:
self._debug_log("开始获取虚拟机列表")
output = await self.coordinator.run_command("virsh list --all") output = await self.coordinator.run_command("virsh list --all")
self._debug_log(f"virsh命令输出: {output}")
self.vms = self._parse_vm_list(output) self.vms = self._parse_vm_list(output)
self._info_log(f"获取到{len(self.vms)}个虚拟机")
return self.vms return self.vms
except Exception as e: except Exception as e:
self._error_log(f"获取虚拟机列表失败: {str(e)}") _LOGGER.error("获取虚拟机列表失败: %s", str(e))
return [] return []
def _parse_vm_list(self, output): def _parse_vm_list(self, output):
@@ -68,18 +43,14 @@ class VMManager:
async def get_vm_title(self, vm_name): async def get_vm_title(self, vm_name):
"""获取虚拟机的标题""" """获取虚拟机的标题"""
try: try:
self._debug_log(f"获取虚拟机{vm_name}的标题")
output = await self.coordinator.run_command(f"virsh dumpxml {vm_name}") output = await self.coordinator.run_command(f"virsh dumpxml {vm_name}")
# 在XML输出中查找<title>标签 # 在XML输出中查找<title>标签
match = re.search(r'<title>(.*?)</title>', output, re.DOTALL) match = re.search(r'<title>(.*?)</title>', output, re.DOTALL)
if match: if match:
title = match.group(1).strip() return match.group(1).strip()
self._debug_log(f"虚拟机{vm_name}标题: {title}")
return title
self._debug_log(f"虚拟机{vm_name}无标题,使用名称")
return vm_name # 如果没有标题,则返回虚拟机名称 return vm_name # 如果没有标题,则返回虚拟机名称
except Exception as e: except Exception as e:
self._error_log(f"获取虚拟机标题失败: {str(e)}") _LOGGER.error("获取虚拟机标题失败: %s", str(e))
return vm_name return vm_name
async def control_vm(self, vm_name, action): async def control_vm(self, vm_name, action):
@@ -90,10 +61,8 @@ class VMManager:
command = f"virsh {action} {vm_name}" command = f"virsh {action} {vm_name}"
try: try:
self._info_log(f"执行虚拟机操作: {command}")
await self.coordinator.run_command(command) await self.coordinator.run_command(command)
self._info_log(f"虚拟机{vm_name}操作{action}成功")
return True return True
except Exception as e: except Exception as e:
self._error_log(f"执行虚拟机操作失败: {str(e)}") _LOGGER.error("执行虚拟机操作失败: %s", str(e))
return False return False