""" Main entry point for Claude Vision Auto """ import sys import os import time import select import pty import tty import termios import signal import subprocess from pathlib import Path from . import config from .screenshot import take_screenshot, cleanup_old_screenshots from .vision_analyzer import VisionAnalyzer def run_claude_with_vision(args: list = None): """ Run Claude Code with vision-based auto-approval Args: args: Command line arguments to pass to claude """ args = args or [] # Initialize vision analyzer analyzer = VisionAnalyzer() # Test connection print("[Claude Vision Auto] Testing Ollama connection...") if not analyzer.test_connection(): print("[ERROR] Cannot connect to Ollama or model not available") print(f"Make sure Ollama is running and '{config.VISION_MODEL}' is installed") sys.exit(1) print(f"[Claude Vision Auto] Connected to Ollama") print(f"[Claude Vision Auto] Using model: {config.VISION_MODEL}") print(f"[Claude Vision Auto] Idle threshold: {config.IDLE_THRESHOLD}s") print() # Build command cmd = ['claude'] + args # Check if claude exists if not subprocess.run(['which', 'claude'], capture_output=True).returncode == 0: print("[ERROR] 'claude' command not found") print("Make sure Claude Code CLI is installed") sys.exit(1) # Cleanup old screenshots cleanup_old_screenshots() # Save original terminal settings old_tty = termios.tcgetattr(sys.stdin) try: # Create pseudo-terminal master_fd, slave_fd = pty.openpty() # Fork process pid = os.fork() if pid == 0: # Child process - run claude os.close(master_fd) # Set up slave as stdin/stdout/stderr os.dup2(slave_fd, 0) os.dup2(slave_fd, 1) os.dup2(slave_fd, 2) if slave_fd > 2: os.close(slave_fd) # Execute claude os.execvp('claude', cmd) else: # Parent process - handle I/O and vision analysis os.close(slave_fd) # Set terminal to raw mode tty.setraw(sys.stdin.fileno()) last_output_time = time.time() output_buffer = bytearray() try: while True: # Check for data from claude or user readable, _, _ = select.select( [master_fd, sys.stdin.fileno()], [], [], 0.1 ) for fd in readable: if fd == master_fd: # Read from claude process try: data = os.read(master_fd, 1024) if not data: # Process ended os.waitpid(pid, 0) return # Write to stdout os.write(sys.stdout.fileno(), data) # Add to buffer for pattern matching output_buffer.extend(data) last_output_time = time.time() # Keep buffer reasonable size if len(output_buffer) > config.OUTPUT_BUFFER_SIZE: output_buffer = output_buffer[-config.OUTPUT_BUFFER_SIZE:] except OSError: # Process ended os.waitpid(pid, 0) return elif fd == sys.stdin.fileno(): # Read from user input data = os.read(sys.stdin.fileno(), 1024) if data: # Forward to claude os.write(master_fd, data) # Check if idle (no output for threshold seconds) idle_time = time.time() - last_output_time if idle_time >= config.IDLE_THRESHOLD: # Check if buffer suggests we're waiting for input buffer_str = output_buffer.decode('utf-8', errors='ignore') # Look for approval keywords has_keywords = any( keyword in buffer_str for keyword in config.APPROVAL_KEYWORDS ) if has_keywords: if config.DEBUG: sys.stderr.write("\n[DEBUG] Approval keywords detected in buffer\n") sys.stderr.flush() sys.stderr.write("\n[Vision] Analyzing prompt...\n") sys.stderr.flush() # Take screenshot screenshot_path = take_screenshot() if screenshot_path: # Analyze with vision response = analyzer.analyze_screenshot(screenshot_path) if response: sys.stderr.write(f"[Vision] Response: {response}\n") sys.stderr.flush() if response and response.upper() != "WAIT": # Send response time.sleep(config.RESPONSE_DELAY) os.write(master_fd, f"{response}\n".encode('utf-8')) # Clear buffer output_buffer.clear() last_output_time = time.time() sys.stderr.write("[Vision] Response sent\n") sys.stderr.flush() else: sys.stderr.write("[Vision] No action needed (WAIT)\n") sys.stderr.flush() else: sys.stderr.write("[Vision] Analysis failed, waiting for manual input\n") sys.stderr.flush() # Clean up screenshot try: Path(screenshot_path).unlink() except Exception: pass else: sys.stderr.write("[Vision] Screenshot failed, waiting for manual input\n") sys.stderr.flush() # Reset idle detection last_output_time = time.time() except KeyboardInterrupt: # Kill child process os.kill(pid, signal.SIGTERM) os.waitpid(pid, 0) finally: # Restore terminal settings termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty) # Close master fd if still open try: os.close(master_fd) except: pass def main(): """CLI entry point""" args = sys.argv[1:] run_claude_with_vision(args) if __name__ == '__main__': main()