Vision-module-auto/claude_vision_auto/main.py
Svrnty 29ac2f0929 fix: Use PTY for proper terminal handling
- Replace subprocess.Popen with pty.fork() for proper TTY handling
- Fixes terminal 'bugging out' when spawning claude
- Properly handles terminal control sequences and colors
- Uses os.fork() and pty.openpty() for interactive shell support
- Maintains proper terminal restoration on exit

This fixes the issue where claude-vision would corrupt the terminal
when trying to spawn the claude subprocess.

Generated with Claude Code

Co-Authored-By: Claude <noreply@anthropic.com>
Co-Authored-By: Jean-Philippe Brule <jp@svrnty.io>
2025-10-30 02:14:33 -04:00

220 lines
7.5 KiB
Python

"""
Main entry point for Claude Vision Auto
"""
import sys
import os
import time
import select
import pty
import tty
import termios
import signal
import subprocess
from pathlib import Path
from . import config
from .screenshot import take_screenshot, cleanup_old_screenshots
from .vision_analyzer import VisionAnalyzer
def run_claude_with_vision(args: list = None):
"""
Run Claude Code with vision-based auto-approval
Args:
args: Command line arguments to pass to claude
"""
args = args or []
# Initialize vision analyzer
analyzer = VisionAnalyzer()
# Test connection
print("[Claude Vision Auto] Testing Ollama connection...")
if not analyzer.test_connection():
print("[ERROR] Cannot connect to Ollama or model not available")
print(f"Make sure Ollama is running and '{config.VISION_MODEL}' is installed")
sys.exit(1)
print(f"[Claude Vision Auto] Connected to Ollama")
print(f"[Claude Vision Auto] Using model: {config.VISION_MODEL}")
print(f"[Claude Vision Auto] Idle threshold: {config.IDLE_THRESHOLD}s")
print()
# Build command
cmd = ['claude'] + args
# Check if claude exists
if not subprocess.run(['which', 'claude'], capture_output=True).returncode == 0:
print("[ERROR] 'claude' command not found")
print("Make sure Claude Code CLI is installed")
sys.exit(1)
# Cleanup old screenshots
cleanup_old_screenshots()
# Save original terminal settings
old_tty = termios.tcgetattr(sys.stdin)
try:
# Create pseudo-terminal
master_fd, slave_fd = pty.openpty()
# Fork process
pid = os.fork()
if pid == 0:
# Child process - run claude
os.close(master_fd)
# Set up slave as stdin/stdout/stderr
os.dup2(slave_fd, 0)
os.dup2(slave_fd, 1)
os.dup2(slave_fd, 2)
if slave_fd > 2:
os.close(slave_fd)
# Execute claude
os.execvp('claude', cmd)
else:
# Parent process - handle I/O and vision analysis
os.close(slave_fd)
# Set terminal to raw mode
tty.setraw(sys.stdin.fileno())
last_output_time = time.time()
output_buffer = bytearray()
try:
while True:
# Check for data from claude or user
readable, _, _ = select.select(
[master_fd, sys.stdin.fileno()],
[],
[],
0.1
)
for fd in readable:
if fd == master_fd:
# Read from claude process
try:
data = os.read(master_fd, 1024)
if not data:
# Process ended
os.waitpid(pid, 0)
return
# Write to stdout
os.write(sys.stdout.fileno(), data)
# Add to buffer for pattern matching
output_buffer.extend(data)
last_output_time = time.time()
# Keep buffer reasonable size
if len(output_buffer) > config.OUTPUT_BUFFER_SIZE:
output_buffer = output_buffer[-config.OUTPUT_BUFFER_SIZE:]
except OSError:
# Process ended
os.waitpid(pid, 0)
return
elif fd == sys.stdin.fileno():
# Read from user input
data = os.read(sys.stdin.fileno(), 1024)
if data:
# Forward to claude
os.write(master_fd, data)
# Check if idle (no output for threshold seconds)
idle_time = time.time() - last_output_time
if idle_time >= config.IDLE_THRESHOLD:
# Check if buffer suggests we're waiting for input
buffer_str = output_buffer.decode('utf-8', errors='ignore')
# Look for approval keywords
has_keywords = any(
keyword in buffer_str
for keyword in config.APPROVAL_KEYWORDS
)
if has_keywords:
if config.DEBUG:
sys.stderr.write("\n[DEBUG] Approval keywords detected in buffer\n")
sys.stderr.flush()
sys.stderr.write("\n[Vision] Analyzing prompt...\n")
sys.stderr.flush()
# Take screenshot
screenshot_path = take_screenshot()
if screenshot_path:
# Analyze with vision
response = analyzer.analyze_screenshot(screenshot_path)
if response:
sys.stderr.write(f"[Vision] Response: {response}\n")
sys.stderr.flush()
if response and response.upper() != "WAIT":
# Send response
time.sleep(config.RESPONSE_DELAY)
os.write(master_fd, f"{response}\n".encode('utf-8'))
# Clear buffer
output_buffer.clear()
last_output_time = time.time()
sys.stderr.write("[Vision] Response sent\n")
sys.stderr.flush()
else:
sys.stderr.write("[Vision] No action needed (WAIT)\n")
sys.stderr.flush()
else:
sys.stderr.write("[Vision] Analysis failed, waiting for manual input\n")
sys.stderr.flush()
# Clean up screenshot
try:
Path(screenshot_path).unlink()
except Exception:
pass
else:
sys.stderr.write("[Vision] Screenshot failed, waiting for manual input\n")
sys.stderr.flush()
# Reset idle detection
last_output_time = time.time()
except KeyboardInterrupt:
# Kill child process
os.kill(pid, signal.SIGTERM)
os.waitpid(pid, 0)
finally:
# Restore terminal settings
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)
# Close master fd if still open
try:
os.close(master_fd)
except:
pass
def main():
"""CLI entry point"""
args = sys.argv[1:]
run_claude_with_vision(args)
if __name__ == '__main__':
main()