diff --git a/Sources/AppleIntelligenceCore/Services/SpeechToTextService.swift b/Sources/AppleIntelligenceCore/Services/SpeechToTextService.swift index 0122cc3..a9e8d74 100644 --- a/Sources/AppleIntelligenceCore/Services/SpeechToTextService.swift +++ b/Sources/AppleIntelligenceCore/Services/SpeechToTextService.swift @@ -84,6 +84,10 @@ public actor SpeechToTextService { /// Streaming session state private var isStreamingActive: Bool = false + private var streamingRequest: SFSpeechAudioBufferRecognitionRequest? + private var streamingRecognizer: SFSpeechRecognizer? + private var streamingTask: SFSpeechRecognitionTask? + private var streamingContinuation: AsyncThrowingStream.Continuation? public init() async { await checkAvailability() @@ -108,19 +112,19 @@ public actor SpeechToTextService { return try await transcribeWithSFSpeechRecognizer(url: tempURL, config: config) } - /// Stream transcription from audio chunks + /// Stream transcription from audio chunks sent via gRPC public func streamTranscribe( config: TranscriptionConfig = .default ) -> AsyncThrowingStream { AsyncThrowingStream { continuation in Task { - guard self.isAvailable else { + guard await self.isAvailable else { continuation.finish(throwing: SpeechToTextError.notAvailable) return } do { - try await self.startStreamingWithSFSpeechRecognizer(config: config, continuation: continuation) + try await self.startStreamingSession(config: config, continuation: continuation) } catch { continuation.finish(throwing: error) } @@ -128,17 +132,46 @@ public actor SpeechToTextService { } } - /// Feed audio chunk for streaming transcription + /// Feed audio chunk for streaming transcription (PCM audio data) public func feedAudioChunk(_ chunk: Data) async throws { - guard isStreamingActive else { + guard isStreamingActive, let request = streamingRequest else { throw SpeechToTextError.transcriptionFailed("No active streaming session") } - // Audio chunk handling implemented in streaming methods + + // Convert raw PCM data to audio buffer + // Assuming 16-bit PCM, mono, 16kHz (common format for speech) + let audioFormat = AVAudioFormat( + commonFormat: .pcmFormatInt16, + sampleRate: 16000, + channels: 1, + interleaved: true + )! + + let frameCount = UInt32(chunk.count / 2) // 2 bytes per Int16 sample + guard let buffer = AVAudioPCMBuffer(pcmFormat: audioFormat, frameCapacity: frameCount) else { + throw SpeechToTextError.audioProcessingFailed("Failed to create audio buffer") + } + + buffer.frameLength = frameCount + + // Copy data into buffer + chunk.withUnsafeBytes { rawPtr in + if let int16Ptr = rawPtr.baseAddress?.assumingMemoryBound(to: Int16.self) { + buffer.int16ChannelData?[0].update(from: int16Ptr, count: Int(frameCount)) + } + } + + request.append(buffer) } /// End streaming session public func endStreamingSession() async { + streamingRequest?.endAudio() isStreamingActive = false + streamingRequest = nil + streamingTask = nil + streamingRecognizer = nil + streamingContinuation = nil } /// Get status information @@ -258,8 +291,8 @@ public actor SpeechToTextService { } } - /// Start streaming with SFSpeechRecognizer - private func startStreamingWithSFSpeechRecognizer( + /// Start streaming session for gRPC audio chunks + private func startStreamingSession( config: TranscriptionConfig, continuation: AsyncThrowingStream.Continuation ) async throws { @@ -272,66 +305,83 @@ public actor SpeechToTextService { throw SpeechToTextError.notAvailable } + // Set up streaming state isStreamingActive = true + streamingRecognizer = recognizer + streamingContinuation = continuation - let audioEngine = AVAudioEngine() let request = SFSpeechAudioBufferRecognitionRequest() request.shouldReportPartialResults = true + streamingRequest = request - let inputNode = audioEngine.inputNode - let recordingFormat = inputNode.outputFormat(forBus: 0) - - inputNode.installTap(onBus: 0, bufferSize: 1024, format: recordingFormat) { buffer, _ in - request.append(buffer) - } - - audioEngine.prepare() - try audioEngine.start() - - recognizer.recognitionTask(with: request) { result, error in - if let error = error { - continuation.finish(throwing: SpeechToTextError.transcriptionFailed(error.localizedDescription)) - return + // Create wrapper to handle results safely + let service = self + let resultHandler = StreamingResultHandler( + config: config, + continuation: continuation, + onFinish: { + Task { await service.endStreamingSession() } } + ) - guard let result = result else { return } - - let transcription = result.bestTranscription - var segments: [TranscriptionSegmentResult] = [] - - if config.enableTimestamps { - for segment in transcription.segments { - segments.append(TranscriptionSegmentResult( - text: segment.substring, - startTime: Float(segment.timestamp), - endTime: Float(segment.timestamp + segment.duration), - confidence: segment.confidence - )) - } - } - - let update = StreamingTranscriptionUpdate( - partialText: transcription.formattedString, - isFinal: result.isFinal, - finalText: result.isFinal ? transcription.formattedString : nil, - segments: segments - ) - continuation.yield(update) - - if result.isFinal { - audioEngine.stop() - inputNode.removeTap(onBus: 0) - continuation.finish() - } + streamingTask = recognizer.recognitionTask(with: request) { result, error in + resultHandler.handleResult(result: result, error: error) + } + } +} + +// MARK: - Streaming Result Handler + +/// Wrapper to safely handle streaming recognition results +private final class StreamingResultHandler: @unchecked Sendable { + private let config: TranscriptionConfig + private let continuation: AsyncThrowingStream.Continuation + private let onFinish: () -> Void + + init( + config: TranscriptionConfig, + continuation: AsyncThrowingStream.Continuation, + onFinish: @escaping () -> Void + ) { + self.config = config + self.continuation = continuation + self.onFinish = onFinish + } + + func handleResult(result: SFSpeechRecognitionResult?, error: Error?) { + if let error = error { + continuation.finish(throwing: SpeechToTextError.transcriptionFailed(error.localizedDescription)) + onFinish() + return + } + + guard let result = result else { return } + + let transcription = result.bestTranscription + var segments: [TranscriptionSegmentResult] = [] + + if config.enableTimestamps { + for segment in transcription.segments { + segments.append(TranscriptionSegmentResult( + text: segment.substring, + startTime: Float(segment.timestamp), + endTime: Float(segment.timestamp + segment.duration), + confidence: segment.confidence + )) + } + } + + let update = StreamingTranscriptionUpdate( + partialText: transcription.formattedString, + isFinal: result.isFinal, + finalText: result.isFinal ? transcription.formattedString : nil, + segments: segments + ) + continuation.yield(update) + + if result.isFinal { + continuation.finish() + onFinish() } - - // Wait for streaming to end - while isStreamingActive { - try await Task.sleep(for: .milliseconds(100)) - } - - audioEngine.stop() - inputNode.removeTap(onBus: 0) - request.endAudio() } }