Fix STT streaming to receive audio from gRPC client

- Fix streaming STT to accept audio chunks from gRPC stream instead of local microphone
- Add proper PCM audio buffer conversion for 16-bit, 16kHz, mono audio
- Add StreamingResultHandler for safe callback handling
- Properly manage streaming session state and cleanup

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Mathias Beaulieu-Duncan 2025-12-31 03:40:46 -05:00
parent 7655f1f0b8
commit f7b8fbfa36

View File

@ -84,6 +84,10 @@ public actor SpeechToTextService {
/// Streaming session state /// Streaming session state
private var isStreamingActive: Bool = false private var isStreamingActive: Bool = false
private var streamingRequest: SFSpeechAudioBufferRecognitionRequest?
private var streamingRecognizer: SFSpeechRecognizer?
private var streamingTask: SFSpeechRecognitionTask?
private var streamingContinuation: AsyncThrowingStream<StreamingTranscriptionUpdate, Error>.Continuation?
public init() async { public init() async {
await checkAvailability() await checkAvailability()
@ -108,19 +112,19 @@ public actor SpeechToTextService {
return try await transcribeWithSFSpeechRecognizer(url: tempURL, config: config) return try await transcribeWithSFSpeechRecognizer(url: tempURL, config: config)
} }
/// Stream transcription from audio chunks /// Stream transcription from audio chunks sent via gRPC
public func streamTranscribe( public func streamTranscribe(
config: TranscriptionConfig = .default config: TranscriptionConfig = .default
) -> AsyncThrowingStream<StreamingTranscriptionUpdate, Error> { ) -> AsyncThrowingStream<StreamingTranscriptionUpdate, Error> {
AsyncThrowingStream { continuation in AsyncThrowingStream { continuation in
Task { Task {
guard self.isAvailable else { guard await self.isAvailable else {
continuation.finish(throwing: SpeechToTextError.notAvailable) continuation.finish(throwing: SpeechToTextError.notAvailable)
return return
} }
do { do {
try await self.startStreamingWithSFSpeechRecognizer(config: config, continuation: continuation) try await self.startStreamingSession(config: config, continuation: continuation)
} catch { } catch {
continuation.finish(throwing: error) continuation.finish(throwing: error)
} }
@ -128,17 +132,46 @@ public actor SpeechToTextService {
} }
} }
/// Feed audio chunk for streaming transcription /// Feed audio chunk for streaming transcription (PCM audio data)
public func feedAudioChunk(_ chunk: Data) async throws { public func feedAudioChunk(_ chunk: Data) async throws {
guard isStreamingActive else { guard isStreamingActive, let request = streamingRequest else {
throw SpeechToTextError.transcriptionFailed("No active streaming session") throw SpeechToTextError.transcriptionFailed("No active streaming session")
} }
// Audio chunk handling implemented in streaming methods
// Convert raw PCM data to audio buffer
// Assuming 16-bit PCM, mono, 16kHz (common format for speech)
let audioFormat = AVAudioFormat(
commonFormat: .pcmFormatInt16,
sampleRate: 16000,
channels: 1,
interleaved: true
)!
let frameCount = UInt32(chunk.count / 2) // 2 bytes per Int16 sample
guard let buffer = AVAudioPCMBuffer(pcmFormat: audioFormat, frameCapacity: frameCount) else {
throw SpeechToTextError.audioProcessingFailed("Failed to create audio buffer")
}
buffer.frameLength = frameCount
// Copy data into buffer
chunk.withUnsafeBytes { rawPtr in
if let int16Ptr = rawPtr.baseAddress?.assumingMemoryBound(to: Int16.self) {
buffer.int16ChannelData?[0].update(from: int16Ptr, count: Int(frameCount))
}
}
request.append(buffer)
} }
/// End streaming session /// End streaming session
public func endStreamingSession() async { public func endStreamingSession() async {
streamingRequest?.endAudio()
isStreamingActive = false isStreamingActive = false
streamingRequest = nil
streamingTask = nil
streamingRecognizer = nil
streamingContinuation = nil
} }
/// Get status information /// Get status information
@ -258,8 +291,8 @@ public actor SpeechToTextService {
} }
} }
/// Start streaming with SFSpeechRecognizer /// Start streaming session for gRPC audio chunks
private func startStreamingWithSFSpeechRecognizer( private func startStreamingSession(
config: TranscriptionConfig, config: TranscriptionConfig,
continuation: AsyncThrowingStream<StreamingTranscriptionUpdate, Error>.Continuation continuation: AsyncThrowingStream<StreamingTranscriptionUpdate, Error>.Continuation
) async throws { ) async throws {
@ -272,25 +305,53 @@ public actor SpeechToTextService {
throw SpeechToTextError.notAvailable throw SpeechToTextError.notAvailable
} }
// Set up streaming state
isStreamingActive = true isStreamingActive = true
streamingRecognizer = recognizer
streamingContinuation = continuation
let audioEngine = AVAudioEngine()
let request = SFSpeechAudioBufferRecognitionRequest() let request = SFSpeechAudioBufferRecognitionRequest()
request.shouldReportPartialResults = true request.shouldReportPartialResults = true
streamingRequest = request
let inputNode = audioEngine.inputNode // Create wrapper to handle results safely
let recordingFormat = inputNode.outputFormat(forBus: 0) let service = self
let resultHandler = StreamingResultHandler(
config: config,
continuation: continuation,
onFinish: {
Task { await service.endStreamingSession() }
}
)
inputNode.installTap(onBus: 0, bufferSize: 1024, format: recordingFormat) { buffer, _ in streamingTask = recognizer.recognitionTask(with: request) { result, error in
request.append(buffer) resultHandler.handleResult(result: result, error: error)
}
}
}
// MARK: - Streaming Result Handler
/// Wrapper to safely handle streaming recognition results
private final class StreamingResultHandler: @unchecked Sendable {
private let config: TranscriptionConfig
private let continuation: AsyncThrowingStream<StreamingTranscriptionUpdate, Error>.Continuation
private let onFinish: () -> Void
init(
config: TranscriptionConfig,
continuation: AsyncThrowingStream<StreamingTranscriptionUpdate, Error>.Continuation,
onFinish: @escaping () -> Void
) {
self.config = config
self.continuation = continuation
self.onFinish = onFinish
} }
audioEngine.prepare() func handleResult(result: SFSpeechRecognitionResult?, error: Error?) {
try audioEngine.start()
recognizer.recognitionTask(with: request) { result, error in
if let error = error { if let error = error {
continuation.finish(throwing: SpeechToTextError.transcriptionFailed(error.localizedDescription)) continuation.finish(throwing: SpeechToTextError.transcriptionFailed(error.localizedDescription))
onFinish()
return return
} }
@ -319,19 +380,8 @@ public actor SpeechToTextService {
continuation.yield(update) continuation.yield(update)
if result.isFinal { if result.isFinal {
audioEngine.stop()
inputNode.removeTap(onBus: 0)
continuation.finish() continuation.finish()
onFinish()
} }
} }
// Wait for streaming to end
while isStreamingActive {
try await Task.sleep(for: .milliseconds(100))
}
audioEngine.stop()
inputNode.removeTap(onBus: 0)
request.endAudio()
}
} }