diff --git a/cvmsentry.py b/cvmsentry.py index 3772827..aa73b85 100644 --- a/cvmsentry.py +++ b/cvmsentry.py @@ -7,7 +7,10 @@ import logging import sys from datetime import datetime, timezone import json -from snapper import snap_all_vms +from io import BytesIO +from PIL import Image +import base64 +import hashlib LOG_LEVEL = getattr(config, "log_level", "INFO") @@ -46,20 +49,48 @@ async def send_guac(websocket, *args: str): await websocket.send(guac_encode(list(args))) async def periodic_snapshot_task(): - """Background task that captures VM snapshots.""" + """Background task that saves VM framebuffers as snapshots in WEBP format.""" log.info("Starting periodic snapshot task") while True: try: - await asyncio.sleep(30) # Wait 30 seconds - log.debug("Running periodic snapshot capture...") + await asyncio.sleep(10) # Wait 10 seconds + log.debug("Running periodic framebuffer snapshot capture...") - # Create snapshots directory with timestamp - timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") - snapshot_dir = os.path.join("logs", timestamp) + # Save framebuffers for all VMs + timestamp = int(datetime.now(timezone.utc).timestamp()) # Use Unix timestamp for filenames + current_day = datetime.now(timezone.utc).strftime("%b-%d-%Y") - # Capture all VMs - await snap_all_vms(snapshot_dir) + for vm_name, vm_data in vms.items(): + # Create snapshots directory for each VM, then by current day + snapshot_dir = os.path.join("logs", "webp", vm_name, current_day) + os.makedirs(snapshot_dir, exist_ok=True) + + framebuffer = vm_data.get("framebuffer") + if framebuffer: + framebuffer.seek(0) + + # Calculate hash of the framebuffer to detect duplicates + framebuffer_data = framebuffer.getvalue() + current_hash = hashlib.md5(framebuffer_data).hexdigest() + + # Check if this frame is the same as the last one + if vm_data.get("last_frame_hash") == current_hash: + log.debug(f"Skipping duplicate frame for VM '{vm_name}'") + continue + + # Save the new frame + framebuffer.seek(0) + image = Image.open(framebuffer) + snapshot_path = os.path.join(snapshot_dir, f"{vm_name}_{timestamp}.webp") + image.save(snapshot_path, format="WEBP", quality=65, optimize=True, method=6) + + # Update the hash for this VM + vm_data["last_frame_hash"] = current_hash + + log.info(f"Saved snapshot for VM '{vm_name}' to {snapshot_path}") + else: + log.warning(f"No framebuffer available for VM '{vm_name}'") except Exception as e: log.error(f"Error in periodic snapshot task: {e}") @@ -72,7 +103,7 @@ async def connect(vm_name: str): if vm_name not in config.vms: log.error(f"VM '{vm_name}' not found in configuration.") return - vms[vm_name] = {"turn_queue": [], "active_turn_user": None, "users": {}} + vms[vm_name] = {"turn_queue": [], "active_turn_user": None, "users": {}, "framebuffer": None, "last_frame_hash": None} uri = config.vms[vm_name] log_file_path = os.path.join(getattr(config, "log_directory", "logs"), f"{vm_name}.json") if not os.path.exists(log_file_path): @@ -204,6 +235,51 @@ async def connect(vm_name: str): vms[vm_name]["active_turn_user"] = None vms[vm_name]["turn_queue"] = [] log.debug(f"({STATE.name} - {vm_name}) Turn queue is naturally exhausted.") + case ["png", "0", "0", "0", "0", initial_frame_b64]: + # Decode the base64 image data + initial_frame_data = base64.b64decode(initial_frame_b64) + + # Create an image from the decoded data + image = Image.open(BytesIO(initial_frame_data)) + + # Store the image in memory as a BytesIO object + framebuffer = BytesIO() + image.save(framebuffer, format="JPEG") + framebuffer.seek(0) + + # Assign the in-memory framebuffer to the VM's dictionary + vms[vm_name]["framebuffer"] = framebuffer + framebuffer_size = framebuffer.getbuffer().nbytes + log.info(f"({STATE.name} - {vm_name}) !!! WHOLE FRAME UPDATE !!! ({framebuffer_size} bytes)") + case ["png", "0", "0", x, y, rect_b64]: + # Decode the base64 image data for the rectangle + rect_data = base64.b64decode(rect_b64) + + # Create an image from the decoded rectangle data + rect_image = Image.open(BytesIO(rect_data)) + + # Update the in-memory framebuffer + if vms[vm_name]["framebuffer"]: + framebuffer = vms[vm_name]["framebuffer"] + framebuffer.seek(0) + framebuffer_image = Image.open(framebuffer) + + # Paste the rectangle onto the framebuffer at the specified coordinates + framebuffer_image.paste(rect_image, (int(x), int(y))) + + # Save the updated framebuffer back to the in-memory BytesIO object + framebuffer = BytesIO() + framebuffer_image.save(framebuffer, format="JPEG") + framebuffer.seek(0) + + # Update the VM's dictionary with the new framebuffer + vms[vm_name]["framebuffer"] = framebuffer + + # Log the updated framebuffer size + framebuffer_size = framebuffer.getbuffer().nbytes + log.debug(f"({STATE.name} - {vm_name}) Updated framebuffer size: {framebuffer_size} bytes") + else: + continue case ["turn", turn_time, count, current_turn, *queue]: if queue == vms[vm_name]["turn_queue"] and current_turn == vms[vm_name]["active_turn_user"]: #log.debug(f"({STATE.name} - {vm_name}) Incoming turn update matches the VM's state. Dropping update.")