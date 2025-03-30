#!/usr/bin/env python3

"""

ChatGPT for Rotary Phone

-------------------------------------------------------------

https://en.polluxlabs.net

MIT License

Copyright (c) 2025 Frederik Kumbartzki

Permission is hereby granted, free of charge, to any person obtaining a copy

of this software and associated documentation files (the "Software"), to deal

in the Software without restriction, including without limitation the rights

to use, copy, modify, merge, publish, distribute, sublicense, and/or sell

copies of the Software, and to permit persons to whom the Software is

furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all

copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR

IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,

FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE

AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER

LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,

OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE

SOFTWARE.

"""

import os

import sys

import time

import threading

from queue import Queue

from pathlib import Path

# Audio and speech libraries

os.environ['PYGAME_HIDE_SUPPORT_PROMPT'] = "hide"

import pygame

import pyaudio

import numpy as np

import wave

from openai import OpenAI

# OpenAI API Key

from dotenv import load_dotenv

load_dotenv()

OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")

if not OPENAI_API_KEY:

print("Error: OPENAI_API_KEY not found.")

sys.exit(1)

# Hardware libraries

from gpiozero import Button

# Constants and configurations

AUDIO_DIR = "/home/pi/Desktop/callGPT"

AUDIO_FILES = {

"tone": f"{AUDIO_DIR}/a440.mp3",

"try_again": f"{AUDIO_DIR}/tryagain.mp3",

"error": f"{AUDIO_DIR}/error.mp3"

}

DIAL_PIN = 23 # GPIO pin for rotary dial

SWITCH_PIN = 17 # GPIO pin for hook switch

# Audio parameters

AUDIO_FORMAT = pyaudio.paInt16

CHANNELS = 1

SAMPLE_RATE = 16000

CHUNK_SIZE = 1024

SILENCE_THRESHOLD = 500

MAX_SILENCE_CHUNKS = 20 # About 1.3 seconds of silence

DEBOUNCE_TIME = 0.1 # Time in seconds for debouncing button inputs

class AudioManager:

"""Manages audio playback and recording."""

def __init__(self):

pygame.mixer.init(frequency=44100, buffer=2048)

self.playing_audio = False

self.audio_thread = None

# Create temp directory

self.temp_dir = Path(__file__).parent / "temp_audio"

self.temp_dir.mkdir(exist_ok=True)

# Preload sounds

self.sounds = {}

for name, path in AUDIO_FILES.items():

try:

self.sounds[name] = pygame.mixer.Sound(path)

except:

print(f"Error loading {path}")

def play_file(self, file_path, wait=True):

try:

sound = pygame.mixer.Sound(file_path)

channel = sound.play()

if wait and channel:

while channel.get_busy():

pygame.time.Clock().tick(30)

except:

pygame.mixer.music.load(file_path)

pygame.mixer.music.play()

if wait:

while pygame.mixer.music.get_busy():

pygame.time.Clock().tick(30)

def start_continuous_tone(self):

self.playing_audio = True

if self.audio_thread and self.audio_thread.is_alive():

self.playing_audio = False

self.audio_thread.join(timeout=1.0)

self.audio_thread = threading.Thread(target=self._play_continuous_tone)

self.audio_thread.daemon = True

self.audio_thread.start()

def _play_continuous_tone(self):

try:

if "tone" in self.sounds:

self.sounds["tone"].play(loops=-1)

while self.playing_audio:

time.sleep(0.1)

self.sounds["tone"].stop()

else:

pygame.mixer.music.load(AUDIO_FILES["tone"])

pygame.mixer.music.play(loops=-1)

while self.playing_audio:

time.sleep(0.1)

pygame.mixer.music.stop()

except Exception as e:

print(f"Error during tone playback: {e}")

def stop_continuous_tone(self):

self.playing_audio = False

if "tone" in self.sounds:

self.sounds["tone"].stop()

if pygame.mixer.get_init() and pygame.mixer.music.get_busy():

pygame.mixer.music.stop()

class SpeechRecognizer:

"""Handles real-time speech recognition using OpenAI's Whisper API."""

def __init__(self, openai_client):

self.client = openai_client

self.audio = pyaudio.PyAudio()

self.stream = None

def capture_and_transcribe(self):

# Setup audio stream if not already initialized

if not self.stream:

self.stream = self.audio.open(

format=AUDIO_FORMAT,

channels=CHANNELS,

rate=SAMPLE_RATE,

input=True,

frames_per_buffer=CHUNK_SIZE,

)

# Set up queue and threading

audio_queue = Queue()

stop_event = threading.Event()

# Start audio capture thread

capture_thread = threading.Thread(

target=self._capture_audio,

args=(audio_queue, stop_event)

)

capture_thread.daemon = True

capture_thread.start()

# Process the audio

result = self._process_audio(audio_queue, stop_event)

# Cleanup

stop_event.set()

capture_thread.join()

return result

def _capture_audio(self, queue, stop_event):

while not stop_event.is_set():

try:

data = self.stream.read(CHUNK_SIZE, exception_on_overflow=False)

queue.put(data)

except KeyboardInterrupt:

break

def _process_audio(self, queue, stop_event):

buffer = b""

speaking = False

silence_counter = 0

while not stop_event.is_set():

if not queue.empty():

chunk = queue.get()

# Check volume

data_np = np.frombuffer(chunk, dtype=np.int16)

volume = np.abs(data_np).mean()

# Detect speaking

if volume > SILENCE_THRESHOLD:

speaking = True

silence_counter = 0

elif speaking:

silence_counter += 1

# Add chunk to buffer

buffer += chunk

# Process if we've detected end of speech

if speaking and silence_counter > MAX_SILENCE_CHUNKS:

print("Processing speech...")

# Save to temp file

temp_file = Path(__file__).parent / "temp_recording.wav"

self._save_audio(buffer, temp_file)

# Transcribe

try:

return self._transcribe_audio(temp_file)

except Exception as e:

print(f"Error during transcription: {e}")

buffer = b""

speaking = False

silence_counter = 0

return None

def _save_audio(self, buffer, file_path):

with wave.open(str(file_path), "wb") as wf:

wf.setnchannels(CHANNELS)

wf.setsampwidth(self.audio.get_sample_size(AUDIO_FORMAT))

wf.setframerate(SAMPLE_RATE)

wf.writeframes(buffer)

def _transcribe_audio(self, file_path):

with open(file_path, "rb") as audio_file:

transcription = self.client.audio.transcriptions.create(

model="whisper-1",

file=audio_file,

language="en"

)

return transcription.text

def cleanup(self):

if self.stream:

self.stream.stop_stream()

self.stream.close()

self.stream = None

if self.audio:

self.audio.terminate()

self.audio = None

class ResponseGenerator:

"""Generates and speaks streaming responses from OpenAI's API."""

def __init__(self, openai_client, temp_dir):

self.client = openai_client

self.temp_dir = temp_dir

self.answer = ""

def generate_streaming_response(self, user_input, conversation_history=None):

self.answer = ""

collected_messages = []

chunk_files = []

# Audio playback queue and control variables

audio_queue = Queue()

playing_event = threading.Event()

stop_event = threading.Event()

# Start the audio playback thread

playback_thread = threading.Thread(

target=self._audio_playback_worker,

args=(audio_queue, playing_event, stop_event)

)

playback_thread.daemon = True

playback_thread.start()

# Prepare messages

messages = [

{"role": "system", "content": "You are a humorous conversation partner engaged in a natural phone call. Keep your answers concise and to the point."}

]

# Use conversation history if available, but limit to last 4 pairs

if conversation_history and len(conversation_history) > 0:

if len(conversation_history) > 8:

conversation_history = conversation_history[-8:]

messages.extend(conversation_history)

else:

messages.append({"role": "user", "content": user_input})

# Stream the response

stream = self.client.chat.completions.create(

model="gpt-4o-mini",

messages=messages,

stream=True

)

# Variables for sentence chunking

sentence_buffer = ""

chunk_counter = 0

for chunk in stream:

if chunk.choices and hasattr(chunk.choices[0], 'delta') and hasattr(chunk.choices[0].delta, 'content'):

content = chunk.choices[0].delta.content

if content:

collected_messages.append(content)

sentence_buffer += content

# Process when we have a complete sentence or phrase

if any(end in content for end in [".", "!", "?", ":"]) or len(sentence_buffer) > 100:

# Generate speech for this chunk

chunk_file_path = self.temp_dir / f"chunk_{chunk_counter}.mp3"

try:

# Generate speech

response = self.client.audio.speech.create(

model="tts-1",

voice="alloy",

input=sentence_buffer,

speed=1.0

)

response.stream_to_file(str(chunk_file_path))

chunk_files.append(str(chunk_file_path))

# Add to playback queue

audio_queue.put(str(chunk_file_path))

# Signal playback thread if it's waiting

playing_event.set()

except Exception as e:

print(f"Error generating speech for chunk: {e}")

# Reset buffer and increment counter

sentence_buffer = ""

chunk_counter += 1

# Process any remaining text

if sentence_buffer.strip():

chunk_file_path = self.temp_dir / f"chunk_{chunk_counter}.mp3"

try:

response = self.client.audio.speech.create(

model="tts-1",

voice="alloy",

input=sentence_buffer,

speed=1.2

)

response.stream_to_file(str(chunk_file_path))

chunk_files.append(str(chunk_file_path))

audio_queue.put(str(chunk_file_path))

playing_event.set()

except Exception as e:

print(f"Error generating final speech chunk: {e}")

# Signal end of generation

audio_queue.put(None) # Sentinel to signal end of queue

# Wait for playback to complete

playback_thread.join()

stop_event.set() # Ensure the thread stops

# Combine all messages

self.answer = "".join(collected_messages)

print(self.answer)

# Clean up temp files

self._cleanup_temp_files(chunk_files)

return self.answer

def _audio_playback_worker(self, queue, playing_event, stop_event):

while not stop_event.is_set():

# Wait for a signal that there's something to play

if queue.empty():

playing_event.wait(timeout=0.1)

playing_event.clear()

continue

# Get the next file to play

file_path = queue.get()

# None is our sentinel value to signal end of queue

if file_path is None:

break

try:

# Play audio and wait for completion

pygame.mixer.music.load(file_path)

pygame.mixer.music.play()

# Wait for playback to complete before moving to next chunk

while pygame.mixer.music.get_busy() and not stop_event.is_set():

pygame.time.Clock().tick(30)

# Small pause between chunks for more natural flow

time.sleep(0.05)

except Exception as e:

print(f"Error playing audio chunk: {e}")

def _cleanup_temp_files(self, file_list):

# Wait a moment to ensure files aren't in use

time.sleep(0.5)

for file_path in file_list:

try:

if os.path.exists(file_path):

os.remove(file_path)

except Exception as e:

print(f"Error removing temp file: {e}")

class RotaryDialer:

"""Handles rotary phone dialing and services."""

def __init__(self, openai_client):

self.client = openai_client

self.audio_manager = AudioManager()

self.speech_recognizer = SpeechRecognizer(openai_client)

self.response_generator = ResponseGenerator(openai_client, self.audio_manager.temp_dir)

# Set up GPIO

self.dial_button = Button(DIAL_PIN, pull_up=True)

self.switch = Button(SWITCH_PIN, pull_up=True)

# State variables

self.pulse_count = 0

self.last_pulse_time = 0

self.running = True

def start(self):

# Set up callbacks

self.dial_button.when_pressed = self._pulse_detected

self.switch.when_released = self._handle_switch_released

self.switch.when_pressed = self._handle_switch_pressed

# Start in ready state

if not self.switch.is_pressed:

# Receiver is picked up

self.audio_manager.start_continuous_tone()

else:

# Receiver is on hook

print("Phone in idle state. Pick up the receiver to begin.")

print("Rotary dial ready. Dial a number when the receiver is picked up.")

try:

self._main_loop()

except KeyboardInterrupt:

print("Terminating...")

self._cleanup()

def _main_loop(self):

while self.running:

self._check_number()

time.sleep(0.1)

def _pulse_detected(self):

if not self.switch.is_pressed:

current_time = time.time()

if current_time - self.last_pulse_time > DEBOUNCE_TIME:

self.pulse_count += 1

self.last_pulse_time = current_time

def _check_number(self):

if not self.switch.is_pressed and self.pulse_count > 0:

self.audio_manager.stop_continuous_tone()

time.sleep(1.5) # Wait between digits

if self.pulse_count == 10:

self.pulse_count = 0 # "0" is sent as 10 pulses

print("Dialed service number:", self.pulse_count)

if self.pulse_count == 1:

self._call_gpt_service()

# Return to dial tone after conversation

if not self.switch.is_pressed: # Only if the receiver wasn't hung up

self._reset_state()

self.pulse_count = 0

def _call_gpt_service(self):

# Conversation history for context

conversation_history = []

first_interaction = True

# For faster transitions

speech_recognizer = self.speech_recognizer

response_generator = self.response_generator

# Preparation for next recording

next_recording_thread = None

next_recording_queue = Queue()

# Conversation loop - runs until the receiver is hung up

while not self.switch.is_pressed:

# If there's a prepared next recording thread, use its result

if next_recording_thread:

next_recording_thread.join()

recognized_text = next_recording_queue.get()

next_recording_thread = None

else:

# Only during first iteration or as fallback

print("Listening..." + (" (Speak now)" if first_interaction else ""))

first_interaction = False

# Start audio processing

recognized_text = speech_recognizer.capture_and_transcribe()

if not recognized_text:

print("Could not recognize your speech")

self.audio_manager.play_file(AUDIO_FILES["try_again"])

continue

print("Understood:", recognized_text)

# Update conversation history

conversation_history.append({"role": "user", "content": recognized_text})

# Start the next recording thread PARALLEL to API response

next_recording_thread = threading.Thread(

target=self._background_capture,

args=(speech_recognizer, next_recording_queue)

)

next_recording_thread.daemon = True

next_recording_thread.start()

# Generate the response

response = response_generator.generate_streaming_response(recognized_text, conversation_history)

# Add response to history

conversation_history.append({"role": "assistant", "content": response})

# Check if the receiver was hung up in the meantime

if self.switch.is_pressed:

break

# If we get here, the receiver was hung up

if next_recording_thread and next_recording_thread.is_alive():

next_recording_thread.join(timeout=0.5)

def _background_capture(self, recognizer, result_queue):

try:

result = recognizer.capture_and_transcribe()

result_queue.put(result)

except Exception as e:

print(f"Error in background recording: {e}")

result_queue.put(None)

def _reset_state(self):

self.pulse_count = 0

self.audio_manager.stop_continuous_tone()

self.audio_manager.start_continuous_tone()

print("Rotary dial ready. Dial a number.")

def _handle_switch_released(self):

print("Receiver picked up - System restarting")

self._restart_script()

def _handle_switch_pressed(self):

print("Receiver hung up - System terminating")

self._cleanup()

self.running = False

# Complete termination after short delay

threading.Timer(1.0, self._restart_script).start()

return

def _restart_script(self):

print("Script restarting...")

self.audio_manager.stop_continuous_tone()

os.execv(sys.executable, ['python'] + sys.argv)

def _cleanup(self):

# Terminate Audio Manager

self.audio_manager.stop_continuous_tone()

# Terminate Speech Recognizer if it exists

if hasattr(self, 'speech_recognizer') and self.speech_recognizer:

self.speech_recognizer.cleanup()

print("Resources have been released.")

def main():

# Initialize OpenAI client

client = OpenAI(api_key=OPENAI_API_KEY)

# Create and start the rotary dialer

dialer = RotaryDialer(client)

dialer.start()

print("Program terminated.")

if __name__ == "__main__":

main()