1
0

This is the Greatest Commit of All Time

This commit is contained in:
2025-10-18 21:45:49 -04:00
parent 63dc3600d5
commit 5e1fcf37d0

View File

@@ -1,4 +1,5 @@
from typing import List from typing import List
from urllib.parse import urlparse
from cvmlib import ( from cvmlib import (
guac_decode, guac_decode,
guac_encode, guac_encode,
@@ -118,14 +119,14 @@ async def save_image_async(image, filepath, vm_name, vm_data, current_hash):
log.error(f"Failed to save snapshot for {vm_name}: {e}") log.error(f"Failed to save snapshot for {vm_name}: {e}")
async def connect(vm_name: str): async def connect(vm_obj: dict):
STATE = CollabVMState.WS_DISCONNECTED log.info(f"Connecting to VM at {vm_obj['ws_url']} with origin {get_origin_from_ws_url(vm_obj['ws_url'])}")
global vms global vms
global vm_botuser global vm_botuser
if vm_name not in config.vms: fqdn = urlparse(vm_obj["ws_url"]).netloc
log.error(f"VM '{vm_name}' not found in configuration.") STATE = CollabVMState.WS_DISCONNECTED
return log_label = vm_obj.get("log_label") or f"{fqdn}-{vm_obj.get('node', '')}"
vms[vm_name] = { vms[log_label] = {
"turn_queue": [], "turn_queue": [],
"active_turn_user": None, "active_turn_user": None,
"users": {}, "users": {},
@@ -133,36 +134,46 @@ async def connect(vm_name: str):
"last_frame_hash": None, "last_frame_hash": None,
"size": (0, 0), "size": (0, 0),
} }
uri = config.vms[vm_name] ws_url = vm_obj["ws_url"]
log_file_path = os.path.join( log_directory = getattr(config, "log_directory", "./logs")
getattr(config, "log_directory", "./logs"), f"{vm_name}.json" # Ensure the log directory exists
) os.makedirs(log_directory, exist_ok=True)
log_file_path = os.path.join(log_directory, f"{log_label}.json")
if not os.path.exists(log_file_path): if not os.path.exists(log_file_path):
with open(log_file_path, "w") as log_file: with open(log_file_path, "w") as log_file:
log_file.write("{}") log_file.write("{}")
origin = Origin(vm_obj.get("origin_override", get_origin_from_ws_url(ws_url)))
async with websockets.connect( async with websockets.connect(
uri=uri, uri=ws_url,
subprotocols=[Subprotocol("guacamole")], subprotocols=[Subprotocol("guacamole")],
origin=Origin(get_origin_from_ws_url(uri)), origin=Origin(origin),
user_agent_header="cvmsentry/1 (https://git.nixlabs.dev/clair/cvmsentry)", user_agent_header="cvmsentry/1 (https://git.nixlabs.dev/clair/cvmsentry)",
) as websocket: ) as websocket:
STATE = CollabVMState.WS_CONNECTED STATE = CollabVMState.WS_CONNECTED
log.info(f"Connected to VM '{vm_name}' at {uri}") log.info(f"Connected to VM '{log_label}' at {ws_url}")
await send_guac(websocket, "rename", config.unauth_name) await send_guac(websocket, "rename", config.unauth_name)
await send_guac(websocket, "connect", vm_name) await send_guac(websocket, "connect", vm_obj["node"])
if vm_name not in vm_botuser: if log_label not in vm_botuser:
vm_botuser[vm_name] = "" vm_botuser[log_label] = ""
# response = await websocket.recv()
async for message in websocket: async for message in websocket:
decoded: List[str] = guac_decode(str(message)) decoded: List[str] = guac_decode(str(message))
match decoded: match decoded:
case ["nop"]: case ["nop"]:
await send_guac(websocket, "nop") await send_guac(websocket, "nop")
case ["auth", config.auth_server]: case ["auth", auth_server]:
await asyncio.sleep(1) await asyncio.sleep(1)
await send_guac( if vm_obj.get("auth"):
websocket, "login", config.credentials["session_auth"] await send_guac(
) websocket,
"login",
vm_obj["auth"]["session_auth"],
)
else:
log.error(
f"Auth server '{auth_server}' not recognized for VM '{log_label}'"
)
case [ case [
"connect", "connect",
connection_status, connection_status,
@@ -173,11 +184,11 @@ async def connect(vm_name: str):
if connection_status == "1": if connection_status == "1":
STATE = CollabVMState.VM_CONNECTED STATE = CollabVMState.VM_CONNECTED
log.info( log.info(
f"Connected to VM '{vm_name}' successfully. Turns enabled: {bool(int(turns_enabled))}, Votes enabled: {bool(int(votes_enabled))}, Uploads enabled: {bool(int(uploads_enabled))}" f"Connected to VM '{log_label}' successfully. Turns enabled: {bool(int(turns_enabled))}, Votes enabled: {bool(int(votes_enabled))}, Uploads enabled: {bool(int(uploads_enabled))}"
) )
else: else:
log.error( log.error(
f"Failed to connect to VM '{vm_name}'. Connection status: {connection_status}" f"Failed to connect to VM '{log_label}'. Connection status: {connection_status}"
) )
STATE = CollabVMState.WS_DISCONNECTED STATE = CollabVMState.WS_DISCONNECTED
await websocket.close() await websocket.close()
@@ -189,19 +200,19 @@ async def connect(vm_name: str):
== CollabVMClientRenameStatus.SUCCEEDED == CollabVMClientRenameStatus.SUCCEEDED
): ):
log.debug( log.debug(
f"({STATE.name} - {vm_name}) Bot rename on VM {vm_name}: {vm_botuser[vm_name]} -> {new_name}" f"({STATE.name} - {log_label}) Bot rename on VM {log_label}: {vm_botuser[log_label]} -> {new_name}"
) )
vm_botuser[vm_name] = new_name vm_botuser[log_label] = new_name
else: else:
log.debug( log.debug(
f"({STATE.name} - {vm_name}) Bot rename on VM {vm_name} failed with status {CollabVMClientRenameStatus(int(status)).name}" f"({STATE.name} - {log_label}) Bot rename on VM {log_label} failed with status {CollabVMClientRenameStatus(int(status)).name}"
) )
case ["1", old_name, new_name]: case ["1", old_name, new_name]:
if old_name in vms[vm_name]["users"]: if old_name in vms[log_label]["users"]:
log.debug( log.debug(
f"({STATE.name} - {vm_name}) User rename on VM {vm_name}: {old_name} -> {new_name}" f"({STATE.name} - {log_label}) User rename on VM {log_label}: {old_name} -> {new_name}"
) )
vms[vm_name]["users"][new_name] = vms[vm_name][ vms[log_label]["users"][new_name] = vms[log_label][
"users" "users"
].pop(old_name) ].pop(old_name)
case ["login", "1"]: case ["login", "1"]:
@@ -214,10 +225,10 @@ async def connect(vm_name: str):
system_message = user == "" system_message = user == ""
if system_message or backlog: if system_message or backlog:
continue continue
log.info(f"[{vm_name} - {user}]: {message}") log.info(f"[{log_label} - {user}]: {message}")
def get_rank(username: str) -> CollabVMRank: def get_rank(username: str) -> CollabVMRank:
return vms[vm_name]["users"].get(username, {}).get("rank") return vms[log_label]["users"].get(username, {}).get("rank")
def admin_check(username: str) -> bool: def admin_check(username: str) -> bool:
return ( return (
@@ -283,7 +294,7 @@ async def connect(vm_name: str):
if not admin_check(user): if not admin_check(user):
continue continue
log.info( log.info(
f"({STATE.name} - {vm_name}) Dumping user list for VM {vm_name}: {vms[vm_name]['users']}" f"({STATE.name} - {log_label}) Dumping user list for VM {log_label}: {vms[log_label]['users']}"
) )
await send_chat_message( await send_chat_message(
websocket, f"Dumped user list to console." websocket, f"Dumped user list to console."
@@ -293,41 +304,41 @@ async def connect(vm_name: str):
user = list[i * 2] user = list[i * 2]
rank = CollabVMRank(int(list[i * 2 + 1])) rank = CollabVMRank(int(list[i * 2 + 1]))
if user in vms[vm_name]["users"]: if user in vms[log_label]["users"]:
vms[vm_name]["users"][user]["rank"] = rank vms[log_label]["users"][user]["rank"] = rank
log.info( log.info(
f"[{vm_name}] User '{user}' rank updated to {rank.name}." f"[{log_label}] User '{user}' rank updated to {rank.name}."
) )
else: else:
vms[vm_name]["users"][user] = {"rank": rank} vms[log_label]["users"][user] = {"rank": rank}
log.info( log.info(
f"[{vm_name}] User '{user}' connected with rank {rank.name}." f"[{log_label}] User '{user}' connected with rank {rank.name}."
) )
case ["turn", _, "0"]: case ["turn", _, "0"]:
if STATE < CollabVMState.LOGGED_IN: if STATE < CollabVMState.LOGGED_IN:
continue continue
if ( if (
vms[vm_name]["active_turn_user"] is None vms[log_label]["active_turn_user"] is None
and not vms[vm_name]["turn_queue"] and not vms[log_label]["turn_queue"]
): ):
# log.debug(f"({STATE.name} - {vm_name}) Incoming queue exhaustion matches the VM's state. Dropping update.") # log.debug(f"({STATE.name} - {log_label}) Incoming queue exhaustion matches the VM's state. Dropping update.")
continue continue
vms[vm_name]["active_turn_user"] = None vms[log_label]["active_turn_user"] = None
vms[vm_name]["turn_queue"] = [] vms[log_label]["turn_queue"] = []
log.debug( log.debug(
f"({STATE.name} - {vm_name}) Turn queue is naturally exhausted." f"({STATE.name} - {log_label}) Turn queue is naturally exhausted."
) )
case ["size", "0", width, height]: case ["size", "0", width, height]:
log.debug( log.debug(
f"({STATE.name} - {vm_name}) !!! Framebuffer size update: {width}x{height} !!!" f"({STATE.name} - {log_label}) !!! Framebuffer size update: {width}x{height} !!!"
) )
vms[vm_name]["size"] = (int(width), int(height)) vms[log_label]["size"] = (int(width), int(height))
case ["png", "0", "0", "0", "0", full_frame_b64]: case ["png", "0", "0", "0", "0", full_frame_b64]:
try: try:
log.debug( log.debug(
f"({STATE.name} - {vm_name}) !!! Received full framebuffer update !!!" f"({STATE.name} - {log_label}) !!! Received full framebuffer update !!!"
) )
expected_width, expected_height = vms[vm_name]["size"] expected_width, expected_height = vms[log_label]["size"]
# Decode the base64 data to get the PNG image # Decode the base64 data to get the PNG image
frame_data = base64.b64decode(full_frame_b64) frame_data = base64.b64decode(full_frame_b64)
@@ -337,35 +348,35 @@ async def connect(vm_name: str):
if expected_width > 0 and expected_height > 0: if expected_width > 0 and expected_height > 0:
if frame_img.size != (expected_width, expected_height): if frame_img.size != (expected_width, expected_height):
log.debug( log.debug(
f"({STATE.name} - {vm_name}) Partial framebuffer update: " f"({STATE.name} - {log_label}) Partial framebuffer update: "
f"expected {expected_width}x{expected_height}, got {frame_img.size}" f"expected {expected_width}x{expected_height}, got {frame_img.size}"
) )
# Create a new image of expected size if no framebuffer exists # Create a new image of expected size if no framebuffer exists
if vms[vm_name]["framebuffer"] is None: if vms[log_label]["framebuffer"] is None:
vms[vm_name]["framebuffer"] = Image.new( vms[log_label]["framebuffer"] = Image.new(
"RGB", (expected_width, expected_height) "RGB", (expected_width, expected_height)
) )
# Only update the portion that was received - modify in place # Only update the portion that was received - modify in place
if vms[vm_name]["framebuffer"]: if vms[log_label]["framebuffer"]:
# Paste directly onto existing framebuffer # Paste directly onto existing framebuffer
vms[vm_name]["framebuffer"].paste(frame_img, (0, 0)) vms[log_label]["framebuffer"].paste(frame_img, (0, 0))
frame_img = vms[vm_name]["framebuffer"] frame_img = vms[log_label]["framebuffer"]
# Update the framebuffer with the new image # Update the framebuffer with the new image
vms[vm_name]["framebuffer"] = frame_img vms[log_label]["framebuffer"] = frame_img
log.debug( log.debug(
f"({STATE.name} - {vm_name}) Framebuffer updated with full frame, size: {frame_img.size}" f"({STATE.name} - {log_label}) Framebuffer updated with full frame, size: {frame_img.size}"
) )
except Exception as e: except Exception as e:
log.error( log.error(
f"({STATE.name} - {vm_name}) Failed to process full framebuffer update: {e}" f"({STATE.name} - {log_label}) Failed to process full framebuffer update: {e}"
) )
case ["png", "0", "0", x, y, rect_b64]: case ["png", "0", "0", x, y, rect_b64]:
try: try:
log.debug( log.debug(
f"({STATE.name} - {vm_name}) Received partial framebuffer update at position ({x}, {y})" f"({STATE.name} - {log_label}) Received partial framebuffer update at position ({x}, {y})"
) )
x, y = int(x), int(y) x, y = int(x), int(y)
@@ -374,39 +385,39 @@ async def connect(vm_name: str):
fragment_img = Image.open(BytesIO(frame_data)) fragment_img = Image.open(BytesIO(frame_data))
# If we don't have a framebuffer yet or it's incompatible, create one # If we don't have a framebuffer yet or it's incompatible, create one
if vms[vm_name]["framebuffer"] is None: if vms[log_label]["framebuffer"] is None:
# drop # drop
continue continue
# If we have a valid framebuffer, update it with the fragment # If we have a valid framebuffer, update it with the fragment
if vms[vm_name]["framebuffer"]: if vms[log_label]["framebuffer"]:
# Paste directly onto existing framebuffer (no copy needed) # Paste directly onto existing framebuffer (no copy needed)
vms[vm_name]["framebuffer"].paste(fragment_img, (x, y)) vms[log_label]["framebuffer"].paste(fragment_img, (x, y))
log.debug( log.debug(
f"({STATE.name} - {vm_name}) Updated framebuffer with fragment at ({x}, {y}), fragment size: {fragment_img.size}" f"({STATE.name} - {log_label}) Updated framebuffer with fragment at ({x}, {y}), fragment size: {fragment_img.size}"
) )
else: else:
log.warning( log.warning(
f"({STATE.name} - {vm_name}) Cannot update framebuffer - no base framebuffer exists" f"({STATE.name} - {log_label}) Cannot update framebuffer - no base framebuffer exists"
) )
except Exception as e: except Exception as e:
log.error( log.error(
f"({STATE.name} - {vm_name}) Failed to process partial framebuffer update: {e}" f"({STATE.name} - {log_label}) Failed to process partial framebuffer update: {e}"
) )
case ["turn", turn_time, count, current_turn, *queue]: case ["turn", turn_time, count, current_turn, *queue]:
if ( if (
queue == vms[vm_name]["turn_queue"] queue == vms[log_label]["turn_queue"]
and current_turn == vms[vm_name]["active_turn_user"] and current_turn == vms[log_label]["active_turn_user"]
): ):
continue continue
for user in vms[vm_name]["users"]: for user in vms[log_label]["users"]:
vms[vm_name]["turn_queue"] = queue vms[log_label]["turn_queue"] = queue
vms[vm_name]["active_turn_user"] = ( vms[log_label]["active_turn_user"] = (
current_turn if current_turn != "" else None current_turn if current_turn != "" else None
) )
if current_turn: if current_turn:
log.info( log.info(
f"[{vm_name}] It's now {current_turn}'s turn. Queue: {queue}" f"[{log_label}] It's now {current_turn}'s turn. Queue: {queue}"
) )
utc_now = datetime.now(timezone.utc) utc_now = datetime.now(timezone.utc)
@@ -438,59 +449,58 @@ async def connect(vm_name: str):
case ["remuser", count, *list]: case ["remuser", count, *list]:
for i in range(int(count)): for i in range(int(count)):
username = list[i] username = list[i]
if username in vms[vm_name]["users"]: if username in vms[log_label]["users"]:
del vms[vm_name]["users"][username] del vms[log_label]["users"][username]
log.info(f"[{vm_name}] User '{username}' left.") log.info(f"[{log_label}] User '{username}' left.")
case ["flag", *args] | ["png", *args] | ["sync", *args]: case ["flag", *args] | ["png", *args] | ["sync", *args]:
continue continue
case _: case _:
if decoded is not None: if decoded is not None:
log.debug( log.debug(
f"({STATE.name} - {vm_name}) Unhandled message: {decoded}" f"({STATE.name} - {log_label}) Unhandled message: {decoded}"
) )
log.info(f"CVM-Sentry started") log.info(f"CVM-Sentry started")
for vm in config.vms.keys(): for vm_dict_label, vm_obj in config.vms.items():
def start_vm_thread(vm_name: str): def start_vm_thread(vm_obj: dict):
asyncio.run(connect(vm_name)) asyncio.run(connect(vm_obj))
async def main(): async def main():
async def connect_with_reconnect(vm_name: str): async def connect_with_reconnect(vm_obj: dict):
while True: while True:
try: try:
await connect(vm_name) await connect(vm_obj)
except websockets.exceptions.ConnectionClosedError as e: except websockets.exceptions.ConnectionClosedError as e:
log.error( log.error(
f"Connection to VM '{vm_name}' closed with error: {e}. Reconnecting..." f"Connection to VM '{vm_obj['ws_url']}' closed with error: {e}. Reconnecting..."
) )
await asyncio.sleep(5) # Wait before attempting to reconnect await asyncio.sleep(5) # Wait before attempting to reconnect
except websockets.exceptions.ConnectionClosedOK: except websockets.exceptions.ConnectionClosedOK:
log.warning( log.warning(
f"Connection to VM '{vm_name}' closed cleanly (code 1005). Reconnecting..." f"Connection to VM '{vm_obj['ws_url']}' closed cleanly (code 1005). Reconnecting..."
) )
await asyncio.sleep(5) # Wait before attempting to reconnect await asyncio.sleep(5) # Wait before attempting to reconnect
except websockets.exceptions.InvalidStatus as e: except websockets.exceptions.InvalidStatus as e:
log.error( log.error(
f"Failed to connect to VM '{vm_name}' with status code: {e}. Reconnecting..." f"Failed to connect to VM '{vm_obj['ws_url']}' with status code: {e}. Reconnecting..."
) )
await asyncio.sleep(10) # Wait longer for HTTP errors await asyncio.sleep(10) # Wait longer for HTTP errors
except websockets.exceptions.WebSocketException as e: except websockets.exceptions.WebSocketException as e:
log.error( log.error(
f"WebSocket error connecting to VM '{vm_name}': {e}. Reconnecting..." f"WebSocket error connecting to VM '{vm_obj['ws_url']}': {e}. Reconnecting..."
) )
await asyncio.sleep(5) await asyncio.sleep(5)
except Exception as e: except Exception as e:
log.error( log.error(
f"Unexpected error connecting to VM '{vm_name}': {e}. Reconnecting..." f"Unexpected error connecting to VM '{vm_obj['ws_url']}': {e}. Reconnecting..."
) )
await asyncio.sleep(10) # Wait longer for unexpected errors await asyncio.sleep(10) # Wait longer for unexpected errors
# Create tasks for VM connections # Create tasks for VM connections
vm_tasks = [connect_with_reconnect(vm) for vm in config.vms.keys()] vm_tasks = [connect_with_reconnect(vm) for vm in config.vms.values()]
# Add periodic snapshot task # Add periodic snapshot task
snapshot_task = periodic_snapshot_task() snapshot_task = periodic_snapshot_task()