1
0

Compare commits

...

22 Commits

Author SHA1 Message Date
04b289861a drop errors, because i hate myself 2025-12-08 15:49:55 -05:00
925afed7a5 Merge branch 'master' of git.nixlabs.dev:clair/CVM-Sentry 2025-12-08 15:49:02 -05:00
0959d17403 Make These Logs Suck less 2025-12-08 15:48:59 -05:00
bc5bb81330 BULLSHIT 2025-11-10 02:16:21 -05:00
cdfbc7e55f make snapshot stdout more readable 2025-10-25 10:00:31 -04:00
c479a86f29 Merge branch 'master' of git.nixlabs.dev:clair/CVM-Sentry 2025-10-18 21:45:53 -04:00
5e1fcf37d0 This is the Greatest Commit of All Time 2025-10-18 21:45:49 -04:00
a48ebd4b72 ...???? 2025-10-18 17:46:43 -04:00
63dc3600d5 unauth name change 2025-10-18 17:45:45 -04:00
2154c29515 eror 2025-10-18 17:16:03 -04:00
81ba086b39 update readme 2025-10-15 11:09:50 -04:00
78b57f10c4 WHY WAS THE DATE LOCAL TZ??? CLAIR U IDIOT also swap vm date to date vm 2025-10-14 22:09:39 -04:00
8070f79164 yyyy-mm-dd 2025-10-14 21:10:08 -04:00
918fae093f unbone the fuckin. file name 2025-10-14 20:36:12 -04:00
195e2799a5 change filename just one tad more 2025-10-14 20:31:24 -04:00
359e366fe0 change timestamp format 2025-10-14 20:30:06 -04:00
b1f608d14b minimize size webp nonsense 2025-10-14 18:36:30 -04:00
09d1a77ea5 i somehow missed this entirely 2025-10-14 18:14:27 -04:00
299aa4e0b1 changes changes 2025-10-14 13:28:59 -04:00
167ac1858b remove ollama 2025-10-13 22:15:15 -04:00
e80b3f764f no more ai bye 2025-10-13 16:41:05 -04:00
f846d55d44 add ollama 2025-10-13 15:41:29 -04:00
3 changed files with 215 additions and 207 deletions

View File

@@ -1,3 +1,5 @@
# CVM-Sentry # CVM-Sentry
Python application for taking screenshots and logging chat from a CollabVM instance.
Python application for logging chat (Actual usefulness... EVENTUALLY) # HEAVY DISCLAIMER
A lot of the code was written by the geriatric Claude by Anthropic, in a mix of laziness and inability to write good code. Some of it has been cleaned up, and the bot is considered to be in a stable state. Pull requests in the form of patches sent to `clair@nixlabs.dev` are welcome.

View File

@@ -1,4 +1,5 @@
from typing import List from typing import List
from urllib.parse import urlparse
from cvmlib import ( from cvmlib import (
guac_decode, guac_decode,
guac_encode, guac_encode,
@@ -17,15 +18,10 @@ from io import BytesIO
from PIL import Image from PIL import Image
import base64 import base64
import imagehash import imagehash
LOG_LEVEL = getattr(config, "log_level", "INFO") LOG_LEVEL = getattr(config, "log_level", "INFO")
# Prepare logs # Prepare logs
if not os.path.exists("logs"): log_format = logging.Formatter("[%(asctime)s:%(name)s] %(levelname)s - %(message)s")
os.makedirs("logs")
log_format = logging.Formatter(
"[%(asctime)s:%(name)s] %(levelname)s - %(message)s"
)
stdout_handler = logging.StreamHandler(sys.stdout) stdout_handler = logging.StreamHandler(sys.stdout)
stdout_handler.setFormatter(log_format) stdout_handler.setFormatter(log_format)
log = logging.getLogger("CVMSentry") log = logging.getLogger("CVMSentry")
@@ -34,8 +30,6 @@ log.addHandler(stdout_handler)
vms = {} vms = {}
vm_botuser = {} vm_botuser = {}
STATE = CollabVMState.WS_DISCONNECTED
def get_origin_from_ws_url(ws_url: str) -> str: def get_origin_from_ws_url(ws_url: str) -> str:
domain = ( domain = (
@@ -63,7 +57,7 @@ async def periodic_snapshot_task():
log.info("Starting periodic snapshot task") log.info("Starting periodic snapshot task")
while True: while True:
try: try:
await asyncio.sleep(5) await asyncio.sleep(config.snapshot_cadence)
log.debug("Running periodic framebuffer snapshot capture...") log.debug("Running periodic framebuffer snapshot capture...")
save_tasks = [] save_tasks = []
@@ -72,32 +66,36 @@ async def periodic_snapshot_task():
if not vm_data.get("framebuffer"): if not vm_data.get("framebuffer"):
continue continue
# Create directory structure if it doesn't exist # Create directory structure if it doesn't exist - [date]/[vm] structure in UTC
date_str = datetime.now().strftime("%b-%d-%Y") date_str = datetime.now(timezone.utc).strftime("%Y-%m-%d")
snapshot_dir = os.path.join("logs", "webp", vm_name, date_str) snapshot_dir = os.path.join(config.log_directory, "webp", date_str, vm_name)
os.makedirs(snapshot_dir, exist_ok=True) os.makedirs(snapshot_dir, exist_ok=True)
# Get current epoch timestamp in milliseconds # Generate formatted timestamp in UTC
epoch_timestamp = int(datetime.now().timestamp() * 1000) timestamp = datetime.now(timezone.utc).strftime("%H-%M-%S")
filename = f"{epoch_timestamp}.webp" filename = f"{timestamp}.webp"
filepath = os.path.join(snapshot_dir, filename) filepath = os.path.join(snapshot_dir, filename)
# Create a hash of the framebuffer for comparison # Get framebuffer reference (no copy needed)
framebuffer = vm_data["framebuffer"] framebuffer = vm_data["framebuffer"]
if not framebuffer: if not framebuffer:
continue continue
# Calculate difference hash for the image # Calculate difference hash asynchronously to avoid blocking
current_hash = str(imagehash.dhash(framebuffer)) current_hash = await asyncio.to_thread(
lambda: str(imagehash.dhash(framebuffer))
)
# Only save if the framebuffer has changed since last snapshot # Only save if the framebuffer has changed since last snapshot
if current_hash != vm_data.get("last_frame_hash"): if current_hash != vm_data.get("last_frame_hash"):
# Create a copy of the image to avoid race conditions # Pass framebuffer directly without copying
img_copy = framebuffer.copy() save_tasks.append(
# Create and store task for asynchronous saving asyncio.create_task(
save_tasks.append(asyncio.create_task( save_image_async(
save_image_async(img_copy, filepath, vm_name, vm_data, current_hash) framebuffer, filepath, vm_name, vm_data, current_hash
)) )
)
)
# Wait for all save tasks to complete # Wait for all save tasks to complete
if save_tasks: if save_tasks:
@@ -107,31 +105,28 @@ async def periodic_snapshot_task():
log.error(f"Error in periodic snapshot task: {e}") log.error(f"Error in periodic snapshot task: {e}")
# Continue running even if there's an error # Continue running even if there's an error
async def save_image_async(image, filepath, vm_name, vm_data, current_hash): async def save_image_async(image, filepath, vm_name, vm_data, current_hash):
"""Save an image to disk asynchronously.""" """Save an image to disk asynchronously."""
try: try:
# Run the image saving in a thread pool to avoid blocking # Run the image saving in a thread pool to avoid blocking
await asyncio.to_thread( await asyncio.to_thread(
image.save, image.save, filepath, format="WEBP", quality=65, method=6, minimize_size=True
filepath,
format="WEBP",
quality=65,
method=6
) )
vm_data["last_frame_hash"] = current_hash vm_data["last_frame_hash"] = current_hash
log.info(f"Saved snapshot of {vm_name} to {filepath}") log.info(f"Saved snapshot of {vm_name} ({datetime.now(timezone.utc).strftime('%H:%M:%S')} UTC)")
except Exception as e: except Exception as e:
log.error(f"Failed to save snapshot for {vm_name}: {e}") log.error(f"Failed to save snapshot for {vm_name}: {e}")
async def connect(vm_name: str): async def connect(vm_obj: dict):
global STATE log.debug(f"Connecting to VM at {vm_obj['ws_url']} with origin {get_origin_from_ws_url(vm_obj['ws_url'])}")
global vms global vms
global vm_botuser global vm_botuser
if vm_name not in config.vms: fqdn = urlparse(vm_obj["ws_url"]).netloc
log.error(f"VM '{vm_name}' not found in configuration.") STATE = CollabVMState.WS_DISCONNECTED
return log_label = vm_obj.get("log_label") or f"{fqdn}-{vm_obj.get('node', '')}"
vms[vm_name] = { vms[log_label] = {
"turn_queue": [], "turn_queue": [],
"active_turn_user": None, "active_turn_user": None,
"users": {}, "users": {},
@@ -139,36 +134,43 @@ async def connect(vm_name: str):
"last_frame_hash": None, "last_frame_hash": None,
"size": (0, 0), "size": (0, 0),
} }
uri = config.vms[vm_name] ws_url = vm_obj["ws_url"]
log_file_path = os.path.join( log_directory = getattr(config, "log_directory", "./logs")
getattr(config, "log_directory", "logs"), f"{vm_name}.json" # Create VM-specific log directory
) vm_log_directory = os.path.join(log_directory, log_label)
if not os.path.exists(log_file_path): os.makedirs(vm_log_directory, exist_ok=True)
with open(log_file_path, "w") as log_file:
log_file.write("{}") origin = Origin(vm_obj.get("origin_override", get_origin_from_ws_url(ws_url)))
async with websockets.connect( async with websockets.connect(
uri=uri, uri=ws_url,
subprotocols=[Subprotocol("guacamole")], subprotocols=[Subprotocol("guacamole")],
origin=Origin(get_origin_from_ws_url(uri)), origin=Origin(origin),
user_agent_header="cvmsentry/1 (https://git.nixlabs.dev/clair/cvmsentry)", user_agent_header="cvmsentry/1 (https://git.nixlabs.dev/clair/cvmsentry)",
) as websocket: ) as websocket:
STATE = CollabVMState.WS_CONNECTED STATE = CollabVMState.WS_CONNECTED
log.info(f"Connected to VM '{vm_name}' at {uri}") log.info(f"Connected to VM '{log_label}' at {ws_url}")
await send_guac(websocket, "rename", "") await send_guac(websocket, "rename", config.unauth_name)
await send_guac(websocket, "connect", vm_name) await send_guac(websocket, "connect", vm_obj["node"])
if vm_name not in vm_botuser: if log_label not in vm_botuser:
vm_botuser[vm_name] = "" vm_botuser[log_label] = ""
# response = await websocket.recv()
async for message in websocket: async for message in websocket:
decoded: List[str] = guac_decode(str(message)) decoded: List[str] = guac_decode(str(message))
match decoded: match decoded:
case ["nop"]: case ["nop"]:
await send_guac(websocket, "nop") await send_guac(websocket, "nop")
case ["auth", config.auth_server]: case ["auth", auth_server]:
await asyncio.sleep(1) await asyncio.sleep(1)
await send_guac( if vm_obj.get("auth"):
websocket, "login", config.credentials["session_auth"] await send_guac(
) websocket,
"login",
vm_obj["auth"]["session_auth"],
)
else:
log.error(
f"Auth server '{auth_server}' not recognized for VM '{log_label}'"
)
case [ case [
"connect", "connect",
connection_status, connection_status,
@@ -179,11 +181,11 @@ async def connect(vm_name: str):
if connection_status == "1": if connection_status == "1":
STATE = CollabVMState.VM_CONNECTED STATE = CollabVMState.VM_CONNECTED
log.info( log.info(
f"Connected to VM '{vm_name}' successfully. Turns enabled: {bool(int(turns_enabled))}, Votes enabled: {bool(int(votes_enabled))}, Uploads enabled: {bool(int(uploads_enabled))}" f"Connected to VM '{log_label}' successfully. Turns enabled: {bool(int(turns_enabled))}, Votes enabled: {bool(int(votes_enabled))}, Uploads enabled: {bool(int(uploads_enabled))}"
) )
else: else:
log.error( log.debug(
f"Failed to connect to VM '{vm_name}'. Connection status: {connection_status}" f"Failed to connect to VM '{log_label}'. Connection status: {connection_status}"
) )
STATE = CollabVMState.WS_DISCONNECTED STATE = CollabVMState.WS_DISCONNECTED
await websocket.close() await websocket.close()
@@ -195,19 +197,19 @@ async def connect(vm_name: str):
== CollabVMClientRenameStatus.SUCCEEDED == CollabVMClientRenameStatus.SUCCEEDED
): ):
log.debug( log.debug(
f"({STATE.name} - {vm_name}) Bot rename on VM {vm_name}: {vm_botuser[vm_name]} -> {new_name}" f"({STATE.name} - {log_label}) Bot rename on VM {log_label}: {vm_botuser[log_label]} -> {new_name}"
) )
vm_botuser[vm_name] = new_name vm_botuser[log_label] = new_name
else: else:
log.debug( log.debug(
f"({STATE.name} - {vm_name}) Bot rename on VM {vm_name} failed with status {CollabVMClientRenameStatus(int(status)).name}" f"({STATE.name} - {log_label}) Bot rename on VM {log_label} failed with status {CollabVMClientRenameStatus(int(status)).name}"
) )
case ["1", old_name, new_name]: case ["1", old_name, new_name]:
if old_name in vms[vm_name]["users"]: if old_name in vms[log_label]["users"]:
log.debug( log.debug(
f"({STATE.name} - {vm_name}) User rename on VM {vm_name}: {old_name} -> {new_name}" f"({STATE.name} - {log_label}) User rename on VM {log_label}: {old_name} -> {new_name}"
) )
vms[vm_name]["users"][new_name] = vms[vm_name][ vms[log_label]["users"][new_name] = vms[log_label][
"users" "users"
].pop(old_name) ].pop(old_name)
case ["login", "1"]: case ["login", "1"]:
@@ -218,13 +220,12 @@ async def connect(vm_name: str):
) )
case ["chat", user, message, *backlog]: case ["chat", user, message, *backlog]:
system_message = user == "" system_message = user == ""
if system_message: if system_message or backlog:
continue continue
if not backlog: log.info(f"[{log_label} - {user}]: {message}")
log.info(f"[{vm_name} - {user}]: {message}")
def get_rank(username: str) -> CollabVMRank: def get_rank(username: str) -> CollabVMRank:
return vms[vm_name]["users"].get(username, {}).get("rank") return vms[log_label]["users"].get(username, {}).get("rank")
def admin_check(username: str) -> bool: def admin_check(username: str) -> bool:
return ( return (
@@ -236,46 +237,36 @@ async def connect(vm_name: str):
utc_day = utc_now.strftime("%Y-%m-%d") utc_day = utc_now.strftime("%Y-%m-%d")
timestamp = utc_now.isoformat() timestamp = utc_now.isoformat()
with open(log_file_path, "r+") as log_file: # Get daily log file path
try: daily_log_path = os.path.join(vm_log_directory, f"{utc_day}.json")
log_data = json.load(log_file)
except json.JSONDecodeError:
log_data = {}
if utc_day not in log_data: # Load existing log data or create new
log_data[utc_day] = [] if os.path.exists(daily_log_path):
with open(daily_log_path, "r") as log_file:
try:
log_data = json.load(log_file)
except json.JSONDecodeError:
log_data = []
else:
log_data = []
if backlog: log_data.append(
pass {
# for i in range(0, len(backlog), 2): "type": "chat",
# backlog_user = backlog[i] "timestamp": timestamp,
# backlog_message = backlog[i + 1] "username": user,
# if not any(entry["message"] == backlog_message and entry["username"] == backlog_user for entry in log_data[utc_day]): "message": message,
# log.info(f"[{vm_name} - {backlog_user} (backlog)]: {backlog_message}") }
# log_data[utc_day].append({ )
# "timestamp": timestamp,
# "username": backlog_user,
# "message": backlog_message
# })
log_data[utc_day].append( with open(daily_log_path, "w") as log_file:
{
"type": "chat",
"timestamp": timestamp,
"username": user,
"message": message,
}
)
log_file.seek(0)
json.dump(log_data, log_file, indent=4) json.dump(log_data, log_file, indent=4)
log_file.truncate()
if config.commands["enabled"] and message.startswith( if config.commands["enabled"] and message.startswith(
config.commands["prefix"] config.commands["prefix"]
): ):
command = ( command_full = message[len(config.commands["prefix"]):].strip().lower()
message[len(config.commands["prefix"]) :].strip().lower() command = command_full.split(" ")[0] if " " in command_full else command_full
)
match command: match command:
case "whoami": case "whoami":
await send_chat_message( await send_chat_message(
@@ -293,7 +284,7 @@ async def connect(vm_name: str):
if not admin_check(user): if not admin_check(user):
continue continue
log.info( log.info(
f"({STATE.name} - {vm_name}) Dumping user list for VM {vm_name}: {vms[vm_name]['users']}" f"({STATE.name} - {log_label}) Dumping user list for VM {log_label}: {vms[log_label]['users']}"
) )
await send_chat_message( await send_chat_message(
websocket, f"Dumped user list to console." websocket, f"Dumped user list to console."
@@ -303,37 +294,41 @@ async def connect(vm_name: str):
user = list[i * 2] user = list[i * 2]
rank = CollabVMRank(int(list[i * 2 + 1])) rank = CollabVMRank(int(list[i * 2 + 1]))
if user in vms[vm_name]["users"]: if user in vms[log_label]["users"]:
vms[vm_name]["users"][user]["rank"] = rank vms[log_label]["users"][user]["rank"] = rank
log.info( log.info(
f"[{vm_name}] User '{user}' rank updated to {rank.name}." f"[{log_label}] User '{user}' rank updated to {rank.name}."
) )
else: else:
vms[vm_name]["users"][user] = {"rank": rank} vms[log_label]["users"][user] = {"rank": rank}
log.info( log.info(
f"[{vm_name}] User '{user}' connected with rank {rank.name}." f"[{log_label}] User '{user}' connected with rank {rank.name}."
) )
case ["turn", _, "0"]: case ["turn", _, "0"]:
if STATE < CollabVMState.LOGGED_IN: if STATE < CollabVMState.LOGGED_IN:
continue continue
if ( if (
vms[vm_name]["active_turn_user"] is None vms[log_label]["active_turn_user"] is None
and not vms[vm_name]["turn_queue"] and not vms[log_label]["turn_queue"]
): ):
# log.debug(f"({STATE.name} - {vm_name}) Incoming queue exhaustion matches the VM's state. Dropping update.") # log.debug(f"({STATE.name} - {log_label}) Incoming queue exhaustion matches the VM's state. Dropping update.")
continue continue
vms[vm_name]["active_turn_user"] = None vms[log_label]["active_turn_user"] = None
vms[vm_name]["turn_queue"] = [] vms[log_label]["turn_queue"] = []
log.debug( log.debug(
f"({STATE.name} - {vm_name}) Turn queue is naturally exhausted." f"({STATE.name} - {log_label}) Turn queue is naturally exhausted."
) )
case ["size", "0", width, height]: case ["size", "0", width, height]:
log.debug(f"({STATE.name} - {vm_name}) !!! Framebuffer size update: {width}x{height} !!!") log.debug(
vms[vm_name]["size"] = (int(width), int(height)) f"({STATE.name} - {log_label}) !!! Framebuffer size update: {width}x{height} !!!"
)
vms[log_label]["size"] = (int(width), int(height))
case ["png", "0", "0", "0", "0", full_frame_b64]: case ["png", "0", "0", "0", "0", full_frame_b64]:
try: try:
log.debug(f"({STATE.name} - {vm_name}) !!! Received full framebuffer update !!!") log.debug(
expected_width, expected_height = vms[vm_name]["size"] f"({STATE.name} - {log_label}) !!! Received full framebuffer update !!!"
)
expected_width, expected_height = vms[log_label]["size"]
# Decode the base64 data to get the PNG image # Decode the base64 data to get the PNG image
frame_data = base64.b64decode(full_frame_b64) frame_data = base64.b64decode(full_frame_b64)
@@ -342,30 +337,37 @@ async def connect(vm_name: str):
# Validate image size and handle partial frames # Validate image size and handle partial frames
if expected_width > 0 and expected_height > 0: if expected_width > 0 and expected_height > 0:
if frame_img.size != (expected_width, expected_height): if frame_img.size != (expected_width, expected_height):
log.warning(f"({STATE.name} - {vm_name}) Partial framebuffer update: " log.debug(
f"expected {expected_width}x{expected_height}, got {frame_img.size}") f"({STATE.name} - {log_label}) Partial framebuffer update: "
f"expected {expected_width}x{expected_height}, got {frame_img.size}"
)
# Create a new image of expected size if no framebuffer exists # Create a new image of expected size if no framebuffer exists
if vms[vm_name]["framebuffer"] is None: if vms[log_label]["framebuffer"] is None:
vms[vm_name]["framebuffer"] = Image.new('RGB', (expected_width, expected_height)) vms[log_label]["framebuffer"] = Image.new(
"RGB", (expected_width, expected_height)
)
# Only update the portion that was received # Only update the portion that was received - modify in place
if vms[vm_name]["framebuffer"]: if vms[log_label]["framebuffer"]:
# Create a copy of the current framebuffer to modify # Paste directly onto existing framebuffer
updated_img = vms[vm_name]["framebuffer"].copy() vms[log_label]["framebuffer"].paste(frame_img, (0, 0))
# Paste the new partial frame at position (0,0) frame_img = vms[log_label]["framebuffer"]
updated_img.paste(frame_img, (0, 0))
# Use this as our new framebuffer
frame_img = updated_img
# Update the framebuffer with the new image # Update the framebuffer with the new image
vms[vm_name]["framebuffer"] = frame_img vms[log_label]["framebuffer"] = frame_img
log.debug(f"({STATE.name} - {vm_name}) Framebuffer updated with full frame, size: {frame_img.size}") log.debug(
f"({STATE.name} - {log_label}) Framebuffer updated with full frame, size: {frame_img.size}"
)
except Exception as e: except Exception as e:
log.error(f"({STATE.name} - {vm_name}) Failed to process full framebuffer update: {e}") log.error(
f"({STATE.name} - {log_label}) Failed to process full framebuffer update: {e}"
)
case ["png", "0", "0", x, y, rect_b64]: case ["png", "0", "0", x, y, rect_b64]:
try: try:
log.debug(f"({STATE.name} - {vm_name}) Received partial framebuffer update at position ({x}, {y})") log.debug(
f"({STATE.name} - {log_label}) Received partial framebuffer update at position ({x}, {y})"
)
x, y = int(x), int(y) x, y = int(x), int(y)
# Decode the base64 data to get the PNG image fragment # Decode the base64 data to get the PNG image fragment
@@ -373,121 +375,125 @@ async def connect(vm_name: str):
fragment_img = Image.open(BytesIO(frame_data)) fragment_img = Image.open(BytesIO(frame_data))
# If we don't have a framebuffer yet or it's incompatible, create one # If we don't have a framebuffer yet or it's incompatible, create one
if vms[vm_name]["framebuffer"] is None: if vms[log_label]["framebuffer"] is None:
# drop # drop
continue continue
# If we have a valid framebuffer, update it with the fragment # If we have a valid framebuffer, update it with the fragment
if vms[vm_name]["framebuffer"]: if vms[log_label]["framebuffer"]:
# Create a copy to modify # Paste directly onto existing framebuffer (no copy needed)
updated_img = vms[vm_name]["framebuffer"].copy() vms[log_label]["framebuffer"].paste(fragment_img, (x, y))
# Paste the fragment at the specified position log.debug(
updated_img.paste(fragment_img, (x, y)) f"({STATE.name} - {log_label}) Updated framebuffer with fragment at ({x}, {y}), fragment size: {fragment_img.size}"
# Update the framebuffer )
vms[vm_name]["framebuffer"] = updated_img
log.debug(f"({STATE.name} - {vm_name}) Updated framebuffer with fragment at ({x}, {y}), fragment size: {fragment_img.size}")
else: else:
log.warning(f"({STATE.name} - {vm_name}) Cannot update framebuffer - no base framebuffer exists") log.warning(
f"({STATE.name} - {log_label}) Cannot update framebuffer - no base framebuffer exists"
)
except Exception as e: except Exception as e:
log.error(f"({STATE.name} - {vm_name}) Failed to process partial framebuffer update: {e}") log.error(
f"({STATE.name} - {log_label}) Failed to process partial framebuffer update: {e}"
)
case ["turn", turn_time, count, current_turn, *queue]: case ["turn", turn_time, count, current_turn, *queue]:
if ( if (
queue == vms[vm_name]["turn_queue"] queue == vms[log_label]["turn_queue"]
and current_turn == vms[vm_name]["active_turn_user"] and current_turn == vms[log_label]["active_turn_user"]
): ):
continue continue
for user in vms[vm_name]["users"]: for user in vms[log_label]["users"]:
vms[vm_name]["turn_queue"] = queue vms[log_label]["turn_queue"] = queue
vms[vm_name]["active_turn_user"] = ( vms[log_label]["active_turn_user"] = (
current_turn if current_turn != "" else None current_turn if current_turn != "" else None
) )
if current_turn: if current_turn:
log.info(f"[{vm_name}] It's now {current_turn}'s turn. Queue: {queue}") log.info(
f"[{log_label}] It's now {current_turn}'s turn. Queue: {queue}"
)
utc_now = datetime.now(timezone.utc) utc_now = datetime.now(timezone.utc)
utc_day = utc_now.strftime("%Y-%m-%d") utc_day = utc_now.strftime("%Y-%m-%d")
timestamp = utc_now.isoformat() timestamp = utc_now.isoformat()
with open(log_file_path, "r+") as log_file: # Get daily log file path
try: daily_log_path = os.path.join(vm_log_directory, f"{utc_day}.json")
log_data = json.load(log_file)
except json.JSONDecodeError:
log_data = {}
if utc_day not in log_data: # Load existing log data or create new
log_data[utc_day] = [] if os.path.exists(daily_log_path):
with open(daily_log_path, "r") as log_file:
try:
log_data = json.load(log_file)
except json.JSONDecodeError:
log_data = []
else:
log_data = []
log_data[utc_day].append( log_data.append(
{ {
"type": "turn", "type": "turn",
"timestamp": timestamp, "timestamp": timestamp,
"active_turn_user": current_turn, "active_turn_user": current_turn,
"queue": queue, "queue": queue,
} }
) )
log_file.seek(0) with open(daily_log_path, "w") as log_file:
json.dump(log_data, log_file, indent=4) json.dump(log_data, log_file, indent=4)
log_file.truncate()
case ["remuser", count, *list]: case ["remuser", count, *list]:
for i in range(int(count)): for i in range(int(count)):
username = list[i] username = list[i]
if username in vms[vm_name]["users"]: if username in vms[log_label]["users"]:
del vms[vm_name]["users"][username] del vms[log_label]["users"][username]
log.info(f"[{vm_name}] User '{username}' left.") log.info(f"[{log_label}] User '{username}' left.")
case ( case ["flag", *args] | ["png", *args] | ["sync", *args]:
["flag", *args] | ["png", *args] | ["sync", *args]
):
continue continue
case _: case _:
if decoded is not None: if decoded is not None:
log.debug( log.debug(
f"({STATE.name} - {vm_name}) Unhandled message: {decoded}" f"({STATE.name} - {log_label}) Unhandled message: {decoded}"
) )
log.info(f"CVM-Sentry started")
log.info(f"({STATE.name}) CVM-Sentry started") for vm_dict_label, vm_obj in config.vms.items():
for vm in config.vms.keys(): def start_vm_thread(vm_obj: dict):
asyncio.run(connect(vm_obj))
def start_vm_thread(vm_name: str):
asyncio.run(connect(vm_name))
async def main(): async def main():
async def connect_with_reconnect(vm_name: str): async def connect_with_reconnect(vm_obj: dict):
while True: while True:
try: try:
await connect(vm_name) await connect(vm_obj)
except websockets.exceptions.ConnectionClosedError as e: except websockets.exceptions.ConnectionClosedError as e:
log.warning( log.error(
f"Connection to VM '{vm_name}' closed with error: {e}. Reconnecting..." f"Connection to VM '{vm_obj['ws_url']}' closed with error: {e}. Reconnecting..."
) )
await asyncio.sleep(5) # Wait before attempting to reconnect await asyncio.sleep(0)
except websockets.exceptions.ConnectionClosedOK: except websockets.exceptions.ConnectionClosedOK:
log.warning( log.warning(
f"Connection to VM '{vm_name}' closed cleanly (code 1005). Reconnecting..." f"Connection to VM '{vm_obj['ws_url']}' closed cleanly (code 1005). Reconnecting..."
) )
await asyncio.sleep(5) # Wait before attempting to reconnect await asyncio.sleep(0)
except websockets.exceptions.InvalidStatus as e: except websockets.exceptions.InvalidStatus as e:
log.error( log.debug(
f"Failed to connect to VM '{vm_name}' with status code: {e}. Reconnecting..." f"Failed to connect to VM '{vm_obj['ws_url']}' with status code: {e}. Reconnecting..."
) )
await asyncio.sleep(10) # Wait longer for HTTP errors await asyncio.sleep(0)
except websockets.exceptions.WebSocketException as e: except websockets.exceptions.WebSocketException as e:
log.error( log.error(
f"WebSocket error connecting to VM '{vm_name}': {e}. Reconnecting..." f"WebSocket error connecting to VM '{vm_obj['ws_url']}': {e}. Reconnecting..."
) )
await asyncio.sleep(5) await asyncio.sleep(5)
except Exception as e: except Exception as e:
log.error( log.error(
f"Unexpected error connecting to VM '{vm_name}': {e}. Reconnecting..." f"Unexpected error connecting to VM '{vm_obj['ws_url']}': {e}. Reconnecting..."
) )
await asyncio.sleep(10) # Wait longer for unexpected errors await asyncio.sleep(0)
# Create tasks for VM connections # Create tasks for VM connections
vm_tasks = [connect_with_reconnect(vm) for vm in config.vms.keys()] vm_tasks = [connect_with_reconnect(vm) for vm in config.vms.values()]
# Add periodic snapshot task # Add periodic snapshot task
snapshot_task = periodic_snapshot_task() snapshot_task = periodic_snapshot_task()

2
poetry.lock generated
View File

@@ -1,4 +1,4 @@
# This file is automatically @generated by Poetry 2.1.4 and should not be changed by hand. # This file is automatically @generated by Poetry 2.2.1 and should not be changed by hand.
[[package]] [[package]]
name = "imagehash" name = "imagehash"