diff --git a/cvmsentry.py b/cvmsentry.py index e9a08de..3772827 100644 --- a/cvmsentry.py +++ b/cvmsentry.py @@ -7,6 +7,7 @@ import logging import sys from datetime import datetime, timezone import json +from snapper import snap_all_vms LOG_LEVEL = getattr(config, "log_level", "INFO") @@ -22,7 +23,7 @@ log = logging.getLogger("CVMSentry") log.setLevel(LOG_LEVEL) log.addHandler(stdout_handler) -users = {} +vms = {} vm_botuser = {} STATE = CollabVMState.WS_DISCONNECTED @@ -44,13 +45,34 @@ async def send_chat_message(websocket, message: str): async def send_guac(websocket, *args: str): await websocket.send(guac_encode(list(args))) +async def periodic_snapshot_task(): + """Background task that captures VM snapshots.""" + log.info("Starting periodic snapshot task") + + while True: + try: + await asyncio.sleep(30) # Wait 30 seconds + log.debug("Running periodic snapshot capture...") + + # Create snapshots directory with timestamp + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + snapshot_dir = os.path.join("logs", timestamp) + + # Capture all VMs + await snap_all_vms(snapshot_dir) + + except Exception as e: + log.error(f"Error in periodic snapshot task: {e}") + # Continue running even if there's an error + async def connect(vm_name: str): global STATE - global users + global vms global vm_botuser if vm_name not in config.vms: log.error(f"VM '{vm_name}' not found in configuration.") return + vms[vm_name] = {"turn_queue": [], "active_turn_user": None, "users": {}} uri = config.vms[vm_name] log_file_path = os.path.join(getattr(config, "log_directory", "logs"), f"{vm_name}.json") if not os.path.exists(log_file_path): @@ -66,8 +88,6 @@ async def connect(vm_name: str): log.info(f"Connected to VM '{vm_name}' at {uri}") await send_guac(websocket, "rename", "") await send_guac(websocket, "connect", vm_name) - if vm_name not in users: - users[vm_name] = {} if vm_name not in vm_botuser: vm_botuser[vm_name] = "" # response = await websocket.recv() @@ -96,12 +116,13 @@ async def connect(vm_name: str): else: log.debug(f"({STATE.name} - {vm_name}) Bot rename on VM {vm_name} failed with status {CollabVMClientRenameStatus(int(status)).name}") case ["1", old_name, new_name]: - if old_name in users[vm_name]: + if old_name in vms[vm_name]["users"]: log.debug(f"({STATE.name} - {vm_name}) User rename on VM {vm_name}: {old_name} -> {new_name}") - users[vm_name][new_name] = users[vm_name].pop(old_name) + vms[vm_name]["users"][new_name] = vms[vm_name]["users"].pop(old_name) case ["login", "1"]: STATE = CollabVMState.LOGGED_IN - await send_chat_message(websocket, random.choice(config.autostart_messages)) + if config.send_autostart and config.autostart_messages: + await send_chat_message(websocket, random.choice(config.autostart_messages)) case ["chat", user, message, *backlog]: system_message = (user == "") if system_message: @@ -109,6 +130,12 @@ async def connect(vm_name: str): if not backlog: log.info(f"[{vm_name} - {user}]: {message}") + def get_rank(username: str) -> CollabVMRank: + return vms[vm_name]["users"].get(username, {}).get("rank") + + def admin_check(username: str) -> bool: + return username in config.admins and get_rank(username) > CollabVMRank.Unregistered + utc_now = datetime.now(timezone.utc) utc_day = utc_now.strftime("%Y-%m-%d") timestamp = utc_now.isoformat() @@ -136,6 +163,7 @@ async def connect(vm_name: str): # }) log_data[utc_day].append({ + "type": "chat", "timestamp": timestamp, "username": user, "message": message @@ -144,44 +172,79 @@ async def connect(vm_name: str): log_file.seek(0) json.dump(log_data, log_file, indent=4) log_file.truncate() + if config.commands["enabled"] and message.startswith(config.commands["prefix"]): command = message[len(config.commands["prefix"]):].strip().lower() match command: case "whoami": - await send_chat_message(websocket, f"You are {user} with rank {users[vm_name][user]['rank'].name}.") + await send_chat_message(websocket, f"You are {user} with rank {get_rank(user).name}.") case "about": await send_chat_message(websocket, config.responses.get("about", "CVM-Sentry (NO RESPONSE CONFIGURED)")) case "dump": - if user != "dfu": - await send_chat_message(websocket, "You do not have permission to use this command.") + if not admin_check(user): continue - log.debug(f"({STATE.name} - {vm_name}) Dumping user list for VM {vm_name}: {users[vm_name]}") + log.debug(f"({STATE.name} - {vm_name}) Dumping user list for VM {vm_name}: {vms[vm_name]['users']}") await send_chat_message(websocket, f"Dumped user list to console.") case ["adduser", count, *list]: for i in range(int(count)): user = list[i * 2] rank = CollabVMRank(int(list[i * 2 + 1])) - if user in users[vm_name]: - users[vm_name][user]["rank"] = rank + if user in vms[vm_name]["users"]: + vms[vm_name]["users"][user]["rank"] = rank log.info(f"[{vm_name}] User '{user}' rank updated to {rank.name}.") else: - users[vm_name][user] = {"rank": rank, "turn_active": False} + vms[vm_name]["users"][user] = {"rank": rank} log.info(f"[{vm_name}] User '{user}' connected with rank {rank.name}.") case ["turn", _, "0"]: if STATE < CollabVMState.LOGGED_IN: continue - log.debug(f"({STATE.name} - {vm_name}) Turn queue exhausted.") + if vms[vm_name]["active_turn_user"] is None and not vms[vm_name]["turn_queue"]: + #log.debug(f"({STATE.name} - {vm_name}) Incoming queue exhaustion matches the VM's state. Dropping update.") + continue + vms[vm_name]["active_turn_user"] = None + vms[vm_name]["turn_queue"] = [] + log.debug(f"({STATE.name} - {vm_name}) Turn queue is naturally exhausted.") case ["turn", turn_time, count, current_turn, *queue]: - log.debug(f"({STATE.name} - {vm_name}) Turn queue updated: {queue} | Current turn: {current_turn} | Time left for current turn: {int(turn_time)//1000}s") - for user in users[vm_name]: - users[vm_name][user]["turn_active"] = (user == current_turn) + if queue == vms[vm_name]["turn_queue"] and current_turn == vms[vm_name]["active_turn_user"]: + #log.debug(f"({STATE.name} - {vm_name}) Incoming turn update matches the VM's state. Dropping update.") + continue + for user in vms[vm_name]["users"]: + vms[vm_name]["turn_queue"] = queue + vms[vm_name]["active_turn_user"] = current_turn if current_turn != "" else None + if current_turn: + utc_now = datetime.now(timezone.utc) + utc_day = utc_now.strftime("%Y-%m-%d") + timestamp = utc_now.isoformat() + + with open(log_file_path, "r+") as log_file: + try: + log_data = json.load(log_file) + except json.JSONDecodeError: + log_data = {} + + if utc_day not in log_data: + log_data[utc_day] = [] + + log_data[utc_day].append({ + "type": "turn", + "timestamp": timestamp, + "active_turn_user": current_turn, + "queue": queue + }) + + log_file.seek(0) + json.dump(log_data, log_file, indent=4) + log_file.truncate() + log.debug(f"({STATE.name} - {vm_name}) Turn update: turn_time={turn_time}, count={count}, current_turn={current_turn}, queue={queue}") + + case ["remuser", count, *list]: for i in range(int(count)): username = list[i] - if username in users[vm_name]: - del users[vm_name][username] + if username in vms[vm_name]["users"]: + del vms[vm_name]["users"][username] log.info(f"[{vm_name}] User '{username}' left.") - case ["sync", *args] | ["png", *args] | ["flag", *args] | ["size", *args]: + case ["flag", *args] | ["size", *args] | ["png", *args] | ["sync", *args]: continue case _: if decoded is not None: @@ -195,6 +258,7 @@ for vm in config.vms.keys(): asyncio.run(connect(vm_name)) async def main(): + async def connect_with_reconnect(vm_name: str): while True: try: @@ -205,8 +269,24 @@ for vm in config.vms.keys(): except websockets.exceptions.ConnectionClosedOK: log.warning(f"Connection to VM '{vm_name}' closed cleanly (code 1005). Reconnecting...") await asyncio.sleep(5) # Wait before attempting to reconnect + except websockets.exceptions.InvalidStatus as e: + log.error(f"Failed to connect to VM '{vm_name}' with status code: {e}. Reconnecting...") + await asyncio.sleep(10) # Wait longer for HTTP errors + except websockets.exceptions.WebSocketException as e: + log.error(f"WebSocket error connecting to VM '{vm_name}': {e}. Reconnecting...") + await asyncio.sleep(5) + except Exception as e: + log.error(f"Unexpected error connecting to VM '{vm_name}': {e}. Reconnecting...") + await asyncio.sleep(10) # Wait longer for unexpected errors - tasks = [connect_with_reconnect(vm) for vm in config.vms.keys()] - await asyncio.gather(*tasks) + # Create tasks for VM connections + vm_tasks = [connect_with_reconnect(vm) for vm in config.vms.keys()] + + # Add periodic snapshot task + snapshot_task = periodic_snapshot_task() + + # Run all tasks concurrently + all_tasks = [snapshot_task] + vm_tasks + await asyncio.gather(*all_tasks) asyncio.run(main()) \ No newline at end of file