fix: Use PTY for proper terminal handling

- Replace subprocess.Popen with pty.fork() for proper TTY handling
- Fixes terminal 'bugging out' when spawning claude
- Properly handles terminal control sequences and colors
- Uses os.fork() and pty.openpty() for interactive shell support
- Maintains proper terminal restoration on exit

This fixes the issue where claude-vision would corrupt the terminal
when trying to spawn the claude subprocess.

Generated with Claude Code

Co-Authored-By: Claude <noreply@anthropic.com>
Co-Authored-By: Jean-Philippe Brule <jp@svrnty.io>
This commit is contained in:
Svrnty 2025-10-30 02:14:33 -04:00
parent 52b0813b64
commit 29ac2f0929

View File

@ -3,8 +3,13 @@ Main entry point for Claude Vision Auto
""" """
import sys import sys
import os
import time import time
import select import select
import pty
import tty
import termios
import signal
import subprocess import subprocess
from pathlib import Path from pathlib import Path
@ -40,118 +45,168 @@ def run_claude_with_vision(args: list = None):
# Build command # Build command
cmd = ['claude'] + args cmd = ['claude'] + args
# Start Claude Code process # Check if claude exists
try: if not subprocess.run(['which', 'claude'], capture_output=True).returncode == 0:
process = subprocess.Popen(
cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
bufsize=0
)
except FileNotFoundError:
print("[ERROR] 'claude' command not found") print("[ERROR] 'claude' command not found")
print("Make sure Claude Code CLI is installed") print("Make sure Claude Code CLI is installed")
sys.exit(1) sys.exit(1)
last_output_time = time.time()
output_buffer = bytearray()
# Cleanup old screenshots # Cleanup old screenshots
cleanup_old_screenshots() cleanup_old_screenshots()
# Save original terminal settings
old_tty = termios.tcgetattr(sys.stdin)
try: try:
while True: # Create pseudo-terminal
# Check if there's data to read master_fd, slave_fd = pty.openpty()
readable, _, _ = select.select([process.stdout], [], [], 0.1)
if readable: # Fork process
char = process.stdout.read(1) pid = os.fork()
if not char:
# Process ended
break
# Print to terminal if pid == 0:
sys.stdout.buffer.write(char) # Child process - run claude
sys.stdout.buffer.flush() os.close(master_fd)
output_buffer.extend(char) # Set up slave as stdin/stdout/stderr
last_output_time = time.time() os.dup2(slave_fd, 0)
os.dup2(slave_fd, 1)
os.dup2(slave_fd, 2)
# Keep buffer reasonable size if slave_fd > 2:
if len(output_buffer) > config.OUTPUT_BUFFER_SIZE: os.close(slave_fd)
output_buffer = output_buffer[-config.OUTPUT_BUFFER_SIZE:]
# Check if idle (no output for threshold seconds) # Execute claude
idle_time = time.time() - last_output_time os.execvp('claude', cmd)
else:
# Parent process - handle I/O and vision analysis
os.close(slave_fd)
if idle_time >= config.IDLE_THRESHOLD: # Set terminal to raw mode
# Check if buffer suggests we're waiting for input tty.setraw(sys.stdin.fileno())
buffer_str = output_buffer.decode('utf-8', errors='ignore')
# Look for approval keywords last_output_time = time.time()
has_keywords = any( output_buffer = bytearray()
keyword in buffer_str
for keyword in config.APPROVAL_KEYWORDS
)
if has_keywords: try:
if config.DEBUG: while True:
print("\n[DEBUG] Approval keywords detected in buffer") # Check for data from claude or user
readable, _, _ = select.select(
[master_fd, sys.stdin.fileno()],
[],
[],
0.1
)
print("\n[Vision] Analyzing prompt...", file=sys.stderr) for fd in readable:
if fd == master_fd:
# Read from claude process
try:
data = os.read(master_fd, 1024)
if not data:
# Process ended
os.waitpid(pid, 0)
return
# Take screenshot # Write to stdout
screenshot_path = take_screenshot() os.write(sys.stdout.fileno(), data)
if screenshot_path: # Add to buffer for pattern matching
# Analyze with vision output_buffer.extend(data)
response = analyzer.analyze_screenshot(screenshot_path)
if response:
print(f"[Vision] Response: {response}", file=sys.stderr)
if response and response.upper() != "WAIT":
# Send response
time.sleep(config.RESPONSE_DELAY)
process.stdin.write(f"{response}\n".encode('utf-8'))
process.stdin.flush()
# Clear buffer
output_buffer.clear()
last_output_time = time.time() last_output_time = time.time()
print("[Vision] Response sent", file=sys.stderr) # Keep buffer reasonable size
if len(output_buffer) > config.OUTPUT_BUFFER_SIZE:
output_buffer = output_buffer[-config.OUTPUT_BUFFER_SIZE:]
except OSError:
# Process ended
os.waitpid(pid, 0)
return
elif fd == sys.stdin.fileno():
# Read from user input
data = os.read(sys.stdin.fileno(), 1024)
if data:
# Forward to claude
os.write(master_fd, data)
# Check if idle (no output for threshold seconds)
idle_time = time.time() - last_output_time
if idle_time >= config.IDLE_THRESHOLD:
# Check if buffer suggests we're waiting for input
buffer_str = output_buffer.decode('utf-8', errors='ignore')
# Look for approval keywords
has_keywords = any(
keyword in buffer_str
for keyword in config.APPROVAL_KEYWORDS
)
if has_keywords:
if config.DEBUG:
sys.stderr.write("\n[DEBUG] Approval keywords detected in buffer\n")
sys.stderr.flush()
sys.stderr.write("\n[Vision] Analyzing prompt...\n")
sys.stderr.flush()
# Take screenshot
screenshot_path = take_screenshot()
if screenshot_path:
# Analyze with vision
response = analyzer.analyze_screenshot(screenshot_path)
if response:
sys.stderr.write(f"[Vision] Response: {response}\n")
sys.stderr.flush()
if response and response.upper() != "WAIT":
# Send response
time.sleep(config.RESPONSE_DELAY)
os.write(master_fd, f"{response}\n".encode('utf-8'))
# Clear buffer
output_buffer.clear()
last_output_time = time.time()
sys.stderr.write("[Vision] Response sent\n")
sys.stderr.flush()
else:
sys.stderr.write("[Vision] No action needed (WAIT)\n")
sys.stderr.flush()
else:
sys.stderr.write("[Vision] Analysis failed, waiting for manual input\n")
sys.stderr.flush()
# Clean up screenshot
try:
Path(screenshot_path).unlink()
except Exception:
pass
else: else:
print("[Vision] No action needed (WAIT)", file=sys.stderr) sys.stderr.write("[Vision] Screenshot failed, waiting for manual input\n")
else: sys.stderr.flush()
print("[Vision] Analysis failed, waiting for manual input", file=sys.stderr)
# Clean up screenshot # Reset idle detection
try: last_output_time = time.time()
Path(screenshot_path).unlink()
except Exception:
pass
else:
print("[Vision] Screenshot failed, waiting for manual input", file=sys.stderr)
# Reset idle detection except KeyboardInterrupt:
last_output_time = time.time() # Kill child process
os.kill(pid, signal.SIGTERM)
# Check if process is still running os.waitpid(pid, 0)
if process.poll() is not None:
break
except KeyboardInterrupt:
print("\n[Claude Vision Auto] Interrupted by user")
process.terminate()
process.wait()
sys.exit(130)
finally: finally:
# Wait for process to finish # Restore terminal settings
exit_code = process.wait() termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)
sys.exit(exit_code)
# Close master fd if still open
try:
os.close(master_fd)
except:
pass
def main(): def main():