From 5e1fcf37d08e8e0cd0e78c59eea913e726631cb6 Mon Sep 17 00:00:00 2001 From: Clair Delafuente Date: Sat, 18 Oct 2025 21:45:49 -0400 Subject: [PATCH] This is the Greatest Commit of All Time --- cvmsentry.py | 180 +++++++++++++++++++++++++++------------------------ 1 file changed, 95 insertions(+), 85 deletions(-) diff --git a/cvmsentry.py b/cvmsentry.py index e64f06e..b7d6c47 100644 --- a/cvmsentry.py +++ b/cvmsentry.py @@ -1,4 +1,5 @@ from typing import List +from urllib.parse import urlparse from cvmlib import ( guac_decode, guac_encode, @@ -118,14 +119,14 @@ async def save_image_async(image, filepath, vm_name, vm_data, current_hash): log.error(f"Failed to save snapshot for {vm_name}: {e}") -async def connect(vm_name: str): - STATE = CollabVMState.WS_DISCONNECTED +async def connect(vm_obj: dict): + log.info(f"Connecting to VM at {vm_obj['ws_url']} with origin {get_origin_from_ws_url(vm_obj['ws_url'])}") global vms global vm_botuser - if vm_name not in config.vms: - log.error(f"VM '{vm_name}' not found in configuration.") - return - vms[vm_name] = { + fqdn = urlparse(vm_obj["ws_url"]).netloc + STATE = CollabVMState.WS_DISCONNECTED + log_label = vm_obj.get("log_label") or f"{fqdn}-{vm_obj.get('node', '')}" + vms[log_label] = { "turn_queue": [], "active_turn_user": None, "users": {}, @@ -133,36 +134,46 @@ async def connect(vm_name: str): "last_frame_hash": None, "size": (0, 0), } - uri = config.vms[vm_name] - log_file_path = os.path.join( - getattr(config, "log_directory", "./logs"), f"{vm_name}.json" - ) + ws_url = vm_obj["ws_url"] + log_directory = getattr(config, "log_directory", "./logs") + # Ensure the log directory exists + os.makedirs(log_directory, exist_ok=True) + log_file_path = os.path.join(log_directory, f"{log_label}.json") if not os.path.exists(log_file_path): with open(log_file_path, "w") as log_file: log_file.write("{}") + + origin = Origin(vm_obj.get("origin_override", get_origin_from_ws_url(ws_url))) + async with websockets.connect( - uri=uri, + uri=ws_url, subprotocols=[Subprotocol("guacamole")], - origin=Origin(get_origin_from_ws_url(uri)), + origin=Origin(origin), user_agent_header="cvmsentry/1 (https://git.nixlabs.dev/clair/cvmsentry)", ) as websocket: STATE = CollabVMState.WS_CONNECTED - log.info(f"Connected to VM '{vm_name}' at {uri}") + log.info(f"Connected to VM '{log_label}' at {ws_url}") await send_guac(websocket, "rename", config.unauth_name) - await send_guac(websocket, "connect", vm_name) - if vm_name not in vm_botuser: - vm_botuser[vm_name] = "" - # response = await websocket.recv() + await send_guac(websocket, "connect", vm_obj["node"]) + if log_label not in vm_botuser: + vm_botuser[log_label] = "" async for message in websocket: decoded: List[str] = guac_decode(str(message)) match decoded: case ["nop"]: await send_guac(websocket, "nop") - case ["auth", config.auth_server]: + case ["auth", auth_server]: await asyncio.sleep(1) - await send_guac( - websocket, "login", config.credentials["session_auth"] - ) + if vm_obj.get("auth"): + await send_guac( + websocket, + "login", + vm_obj["auth"]["session_auth"], + ) + else: + log.error( + f"Auth server '{auth_server}' not recognized for VM '{log_label}'" + ) case [ "connect", connection_status, @@ -173,11 +184,11 @@ async def connect(vm_name: str): if connection_status == "1": STATE = CollabVMState.VM_CONNECTED log.info( - f"Connected to VM '{vm_name}' successfully. Turns enabled: {bool(int(turns_enabled))}, Votes enabled: {bool(int(votes_enabled))}, Uploads enabled: {bool(int(uploads_enabled))}" + f"Connected to VM '{log_label}' successfully. Turns enabled: {bool(int(turns_enabled))}, Votes enabled: {bool(int(votes_enabled))}, Uploads enabled: {bool(int(uploads_enabled))}" ) else: log.error( - f"Failed to connect to VM '{vm_name}'. Connection status: {connection_status}" + f"Failed to connect to VM '{log_label}'. Connection status: {connection_status}" ) STATE = CollabVMState.WS_DISCONNECTED await websocket.close() @@ -189,19 +200,19 @@ async def connect(vm_name: str): == CollabVMClientRenameStatus.SUCCEEDED ): log.debug( - f"({STATE.name} - {vm_name}) Bot rename on VM {vm_name}: {vm_botuser[vm_name]} -> {new_name}" + f"({STATE.name} - {log_label}) Bot rename on VM {log_label}: {vm_botuser[log_label]} -> {new_name}" ) - vm_botuser[vm_name] = new_name + vm_botuser[log_label] = new_name else: log.debug( - f"({STATE.name} - {vm_name}) Bot rename on VM {vm_name} failed with status {CollabVMClientRenameStatus(int(status)).name}" + f"({STATE.name} - {log_label}) Bot rename on VM {log_label} failed with status {CollabVMClientRenameStatus(int(status)).name}" ) case ["1", old_name, new_name]: - if old_name in vms[vm_name]["users"]: + if old_name in vms[log_label]["users"]: log.debug( - f"({STATE.name} - {vm_name}) User rename on VM {vm_name}: {old_name} -> {new_name}" + f"({STATE.name} - {log_label}) User rename on VM {log_label}: {old_name} -> {new_name}" ) - vms[vm_name]["users"][new_name] = vms[vm_name][ + vms[log_label]["users"][new_name] = vms[log_label][ "users" ].pop(old_name) case ["login", "1"]: @@ -214,10 +225,10 @@ async def connect(vm_name: str): system_message = user == "" if system_message or backlog: continue - log.info(f"[{vm_name} - {user}]: {message}") + log.info(f"[{log_label} - {user}]: {message}") def get_rank(username: str) -> CollabVMRank: - return vms[vm_name]["users"].get(username, {}).get("rank") + return vms[log_label]["users"].get(username, {}).get("rank") def admin_check(username: str) -> bool: return ( @@ -283,7 +294,7 @@ async def connect(vm_name: str): if not admin_check(user): continue log.info( - f"({STATE.name} - {vm_name}) Dumping user list for VM {vm_name}: {vms[vm_name]['users']}" + f"({STATE.name} - {log_label}) Dumping user list for VM {log_label}: {vms[log_label]['users']}" ) await send_chat_message( websocket, f"Dumped user list to console." @@ -293,41 +304,41 @@ async def connect(vm_name: str): user = list[i * 2] rank = CollabVMRank(int(list[i * 2 + 1])) - if user in vms[vm_name]["users"]: - vms[vm_name]["users"][user]["rank"] = rank + if user in vms[log_label]["users"]: + vms[log_label]["users"][user]["rank"] = rank log.info( - f"[{vm_name}] User '{user}' rank updated to {rank.name}." + f"[{log_label}] User '{user}' rank updated to {rank.name}." ) else: - vms[vm_name]["users"][user] = {"rank": rank} + vms[log_label]["users"][user] = {"rank": rank} log.info( - f"[{vm_name}] User '{user}' connected with rank {rank.name}." + f"[{log_label}] User '{user}' connected with rank {rank.name}." ) case ["turn", _, "0"]: if STATE < CollabVMState.LOGGED_IN: continue if ( - vms[vm_name]["active_turn_user"] is None - and not vms[vm_name]["turn_queue"] + vms[log_label]["active_turn_user"] is None + and not vms[log_label]["turn_queue"] ): - # log.debug(f"({STATE.name} - {vm_name}) Incoming queue exhaustion matches the VM's state. Dropping update.") + # log.debug(f"({STATE.name} - {log_label}) Incoming queue exhaustion matches the VM's state. Dropping update.") continue - vms[vm_name]["active_turn_user"] = None - vms[vm_name]["turn_queue"] = [] + vms[log_label]["active_turn_user"] = None + vms[log_label]["turn_queue"] = [] log.debug( - f"({STATE.name} - {vm_name}) Turn queue is naturally exhausted." + f"({STATE.name} - {log_label}) Turn queue is naturally exhausted." ) case ["size", "0", width, height]: log.debug( - f"({STATE.name} - {vm_name}) !!! Framebuffer size update: {width}x{height} !!!" + f"({STATE.name} - {log_label}) !!! Framebuffer size update: {width}x{height} !!!" ) - vms[vm_name]["size"] = (int(width), int(height)) + vms[log_label]["size"] = (int(width), int(height)) case ["png", "0", "0", "0", "0", full_frame_b64]: try: log.debug( - f"({STATE.name} - {vm_name}) !!! Received full framebuffer update !!!" + f"({STATE.name} - {log_label}) !!! Received full framebuffer update !!!" ) - expected_width, expected_height = vms[vm_name]["size"] + expected_width, expected_height = vms[log_label]["size"] # Decode the base64 data to get the PNG image frame_data = base64.b64decode(full_frame_b64) @@ -337,35 +348,35 @@ async def connect(vm_name: str): if expected_width > 0 and expected_height > 0: if frame_img.size != (expected_width, expected_height): log.debug( - f"({STATE.name} - {vm_name}) Partial framebuffer update: " + f"({STATE.name} - {log_label}) Partial framebuffer update: " f"expected {expected_width}x{expected_height}, got {frame_img.size}" ) # Create a new image of expected size if no framebuffer exists - if vms[vm_name]["framebuffer"] is None: - vms[vm_name]["framebuffer"] = Image.new( + if vms[log_label]["framebuffer"] is None: + vms[log_label]["framebuffer"] = Image.new( "RGB", (expected_width, expected_height) ) # Only update the portion that was received - modify in place - if vms[vm_name]["framebuffer"]: + if vms[log_label]["framebuffer"]: # Paste directly onto existing framebuffer - vms[vm_name]["framebuffer"].paste(frame_img, (0, 0)) - frame_img = vms[vm_name]["framebuffer"] + vms[log_label]["framebuffer"].paste(frame_img, (0, 0)) + frame_img = vms[log_label]["framebuffer"] # Update the framebuffer with the new image - vms[vm_name]["framebuffer"] = frame_img + vms[log_label]["framebuffer"] = frame_img log.debug( - f"({STATE.name} - {vm_name}) Framebuffer updated with full frame, size: {frame_img.size}" + f"({STATE.name} - {log_label}) Framebuffer updated with full frame, size: {frame_img.size}" ) except Exception as e: log.error( - f"({STATE.name} - {vm_name}) Failed to process full framebuffer update: {e}" + f"({STATE.name} - {log_label}) Failed to process full framebuffer update: {e}" ) case ["png", "0", "0", x, y, rect_b64]: try: log.debug( - f"({STATE.name} - {vm_name}) Received partial framebuffer update at position ({x}, {y})" + f"({STATE.name} - {log_label}) Received partial framebuffer update at position ({x}, {y})" ) x, y = int(x), int(y) @@ -374,39 +385,39 @@ async def connect(vm_name: str): fragment_img = Image.open(BytesIO(frame_data)) # If we don't have a framebuffer yet or it's incompatible, create one - if vms[vm_name]["framebuffer"] is None: + if vms[log_label]["framebuffer"] is None: # drop continue # If we have a valid framebuffer, update it with the fragment - if vms[vm_name]["framebuffer"]: + if vms[log_label]["framebuffer"]: # Paste directly onto existing framebuffer (no copy needed) - vms[vm_name]["framebuffer"].paste(fragment_img, (x, y)) + vms[log_label]["framebuffer"].paste(fragment_img, (x, y)) log.debug( - f"({STATE.name} - {vm_name}) Updated framebuffer with fragment at ({x}, {y}), fragment size: {fragment_img.size}" + f"({STATE.name} - {log_label}) Updated framebuffer with fragment at ({x}, {y}), fragment size: {fragment_img.size}" ) else: log.warning( - f"({STATE.name} - {vm_name}) Cannot update framebuffer - no base framebuffer exists" + f"({STATE.name} - {log_label}) Cannot update framebuffer - no base framebuffer exists" ) except Exception as e: log.error( - f"({STATE.name} - {vm_name}) Failed to process partial framebuffer update: {e}" + f"({STATE.name} - {log_label}) Failed to process partial framebuffer update: {e}" ) case ["turn", turn_time, count, current_turn, *queue]: if ( - queue == vms[vm_name]["turn_queue"] - and current_turn == vms[vm_name]["active_turn_user"] + queue == vms[log_label]["turn_queue"] + and current_turn == vms[log_label]["active_turn_user"] ): continue - for user in vms[vm_name]["users"]: - vms[vm_name]["turn_queue"] = queue - vms[vm_name]["active_turn_user"] = ( + for user in vms[log_label]["users"]: + vms[log_label]["turn_queue"] = queue + vms[log_label]["active_turn_user"] = ( current_turn if current_turn != "" else None ) if current_turn: log.info( - f"[{vm_name}] It's now {current_turn}'s turn. Queue: {queue}" + f"[{log_label}] It's now {current_turn}'s turn. Queue: {queue}" ) utc_now = datetime.now(timezone.utc) @@ -438,59 +449,58 @@ async def connect(vm_name: str): case ["remuser", count, *list]: for i in range(int(count)): username = list[i] - if username in vms[vm_name]["users"]: - del vms[vm_name]["users"][username] - log.info(f"[{vm_name}] User '{username}' left.") + if username in vms[log_label]["users"]: + del vms[log_label]["users"][username] + log.info(f"[{log_label}] User '{username}' left.") case ["flag", *args] | ["png", *args] | ["sync", *args]: continue case _: if decoded is not None: log.debug( - f"({STATE.name} - {vm_name}) Unhandled message: {decoded}" + f"({STATE.name} - {log_label}) Unhandled message: {decoded}" ) - log.info(f"CVM-Sentry started") -for vm in config.vms.keys(): +for vm_dict_label, vm_obj in config.vms.items(): - def start_vm_thread(vm_name: str): - asyncio.run(connect(vm_name)) + def start_vm_thread(vm_obj: dict): + asyncio.run(connect(vm_obj)) async def main(): - async def connect_with_reconnect(vm_name: str): + async def connect_with_reconnect(vm_obj: dict): while True: try: - await connect(vm_name) + await connect(vm_obj) except websockets.exceptions.ConnectionClosedError as e: log.error( - f"Connection to VM '{vm_name}' closed with error: {e}. Reconnecting..." + f"Connection to VM '{vm_obj['ws_url']}' closed with error: {e}. Reconnecting..." ) await asyncio.sleep(5) # Wait before attempting to reconnect except websockets.exceptions.ConnectionClosedOK: log.warning( - f"Connection to VM '{vm_name}' closed cleanly (code 1005). Reconnecting..." + f"Connection to VM '{vm_obj['ws_url']}' closed cleanly (code 1005). Reconnecting..." ) await asyncio.sleep(5) # Wait before attempting to reconnect except websockets.exceptions.InvalidStatus as e: log.error( - f"Failed to connect to VM '{vm_name}' with status code: {e}. Reconnecting..." + f"Failed to connect to VM '{vm_obj['ws_url']}' with status code: {e}. Reconnecting..." ) await asyncio.sleep(10) # Wait longer for HTTP errors except websockets.exceptions.WebSocketException as e: log.error( - f"WebSocket error connecting to VM '{vm_name}': {e}. Reconnecting..." + f"WebSocket error connecting to VM '{vm_obj['ws_url']}': {e}. Reconnecting..." ) await asyncio.sleep(5) except Exception as e: log.error( - f"Unexpected error connecting to VM '{vm_name}': {e}. Reconnecting..." + f"Unexpected error connecting to VM '{vm_obj['ws_url']}': {e}. Reconnecting..." ) await asyncio.sleep(10) # Wait longer for unexpected errors # Create tasks for VM connections - vm_tasks = [connect_with_reconnect(vm) for vm in config.vms.keys()] + vm_tasks = [connect_with_reconnect(vm) for vm in config.vms.values()] # Add periodic snapshot task snapshot_task = periodic_snapshot_task()