Talk to Your House: A Gemini-Powered Voice Assistant on Arduino UNO Q
A production-grade, hands-free AI agent combining Gemini voice, wake word detection, LED animations, and multi-protocol smart home control — all running on a single board.
Devices & Components
1
Modulino™ Buttons
1
Modulino™ Thermo
1
Arduino® UNO™ Q 4GB
1
Modulino™ Pixels
1
Arduino USB-C Hub (8 in 1)
1
USB Conference Speaker/Mic
1
Samsung Smart TV
1
Smart Plug SONOFF
1
USB A Camera
1
Smart Bulb Wizz
Software & Tools
1
samsungtvws
1
DuckDuckGo Search (ddgs)
1
OpenCV (headless)
Arduino App Lab
1
Google Genai SDK
1
chip-tool (Matter)
1
pywizlight
1
FastAPI + Uvicorn
1
Python 3.11+
1
Paramiko
Project description
Code
main
python
main py of applab
1"""Q AI Agent V4 — FastAPI entry point.""" 2 3import os 4import json 5import subprocess 6import asyncio 7from contextlib import asynccontextmanager 8from datetime import datetime, timezone 9from pathlib import Path 10 11from fastapi import FastAPI, WebSocket 12from fastapi.staticfiles import StaticFiles 13from fastapi.responses import FileResponse, JSONResponse 14from dotenv import load_dotenv 15import uvicorn 16 17load_dotenv("/app/.env") 18 19from automations import AutomationEngine 20from gemini_session import websocket_endpoint 21from headless_voice import headless_button_loop, _warm_up as warm_up_headless 22from camera import IMAGES_DIR 23import mcu 24import interaction_log 25from devices import samsung 26 27from wake_word import start_wake_word, stop_wake_word 28 29BASE_DIR = Path(__file__).parent.parent 30DATA_DIR = BASE_DIR / "data" 31ASSETS_DIR = BASE_DIR / "assets" 32 33_automation_engine: AutomationEngine | None = None 34SENSOR_LOG_PATH = DATA_DIR / "sensor_log.json" 35SENSOR_LOG_MAX = 1440 # 24h at 1 reading/min 36 37 38def _load_sensor_log() -> list: 39 if SENSOR_LOG_PATH.exists(): 40 try: 41 return json.loads(SENSOR_LOG_PATH.read_text()) 42 except Exception: 43 pass 44 return [] 45 46 47def _save_sensor_log(log: list): 48 try: 49 SENSOR_LOG_PATH.write_text(json.dumps(log)) 50 except Exception as e: 51 print(f"[SensorLog] Save error: {e}") 52 53 54@asynccontextmanager 55async def lifespan(app: FastAPI): 56 global _automation_engine 57 DATA_DIR.mkdir(exist_ok=True) 58 _automation_engine = AutomationEngine(str(DATA_DIR / "automations.json")) 59 print(f"[Main] Automation engine started ({len(_automation_engine.list_schedules())} schedules, " 60 f"{len(_automation_engine.list_triggers())} triggers)") 61 62 warm_up_headless() 63 64 schedule_task = asyncio.create_task(_schedule_loop()) 65 trigger_task = asyncio.create_task(_trigger_loop()) 66 sensor_task = asyncio.create_task(_sensor_log_loop()) 67 headless_task = asyncio.create_task(headless_button_loop(automation_engine=_automation_engine)) 68 69 # Start wake word listener (needs the running event loop) 70 loop = asyncio.get_running_loop() 71 start_wake_word(loop) 72 73 yield 74 75 stop_wake_word() 76 schedule_task.cancel() 77 trigger_task.cancel() 78 sensor_task.cancel() 79 headless_task.cancel() 80 for task in (schedule_task, trigger_task, sensor_task, headless_task): 81 try: 82 await task 83 except asyncio.CancelledError: 84 pass 85 86 87async def _schedule_loop(): 88 """Check scheduled automations every 30 seconds.""" 89 while True: 90 try: 91 results = await _automation_engine.check_schedules() 92 for r in results: 93 print(f"[Schedule] {r}") 94 except Exception as e: 95 print(f"[Schedule] Error: {e}") 96 await asyncio.sleep(30) 97 98 99async def _trigger_loop(): 100 """Evaluate sensor triggers every 30 seconds using cached sensor values. 101 Only runs when a session has populated the cache (avoids cold MCU polling).""" 102 await asyncio.sleep(15) # Give session time to connect and warm up cache 103 while True: 104 try: 105 cached = mcu.get_cached_sensors() 106 temp, humidity = cached["temp"], cached["humidity"] 107 if temp != 0.0 or humidity != 0.0: 108 results = await _automation_engine.check_triggers(temp, humidity) 109 for r in results: 110 print(f"[Trigger] {r}") 111 except Exception as e: 112 print(f"[Trigger] Error: {e}") 113 await asyncio.sleep(30) 114 115 116async def _sensor_log_loop(): 117 """Log temperature and humidity every 60 seconds from the session cache.""" 118 log = _load_sensor_log() 119 await asyncio.sleep(30) # Wait for session to warm up cache 120 while True: 121 try: 122 cached = mcu.get_cached_sensors() 123 temp, humidity = cached["temp"], cached["humidity"] 124 if temp != 0.0 or humidity != 0.0: 125 from zoneinfo import ZoneInfo 126 import os as _os 127 tz = ZoneInfo(_os.getenv("LOCAL_TZ", "Europe/Rome")) 128 ts = datetime.now(tz).strftime("%Y-%m-%dT%H:%M") 129 log.append({"ts": ts, "temp": round(temp, 1), "humidity": round(humidity, 1)}) 130 if len(log) > SENSOR_LOG_MAX: 131 log = log[-SENSOR_LOG_MAX:] 132 _save_sensor_log(log) 133 except Exception as e: 134 print(f"[SensorLog] Error: {e}") 135 await asyncio.sleep(60) 136 137 138app = FastAPI(lifespan=lifespan) 139app.mount("/assets", StaticFiles(directory=str(ASSETS_DIR)), name="assets") 140 141 142@app.get("/") 143def read_root(): 144 return FileResponse(str(ASSETS_DIR / "index.html")) 145 146 147@app.get("/camera_images/{filename}") 148async def get_camera_image(filename: str): 149 filepath = os.path.join(IMAGES_DIR, filename) 150 if os.path.exists(filepath) and filepath.startswith(IMAGES_DIR): 151 return FileResponse(filepath, media_type="image/jpeg") 152 return {"error": "Image not found"} 153 154 155@app.get("/api/automations") 156async def get_automations(): 157 if _automation_engine: 158 return _automation_engine.data 159 return {"schedules": [], "triggers": []} 160 161 162@app.get("/api/sensors") 163async def get_sensors(): 164 """Return recent sensor log (up to last 1440 readings = 24h).""" 165 return JSONResponse(_load_sensor_log()) 166 167 168@app.post("/api/automations/test") 169async def test_automation(body: dict): 170 """Test-fire an automation action directly.""" 171 if not _automation_engine: 172 return {"error": "Automation engine not available"} 173 result = await _automation_engine.execute_action(body) 174 return {"result": result} 175 176 177@app.get("/api/logs") 178async def get_logs(since: int = 0): 179 """Return interaction log entries newer than since_id.""" 180 return JSONResponse(interaction_log.get(since)) 181 182 183@app.post("/api/tv/power") 184async def tv_power(body: dict): 185 action = body.get("action", "on") 186 if action == "on": 187 success = await samsung.turn_on() 188 else: 189 success = await samsung.turn_off() 190 interaction_log.add("tv_command", f"Power {action}", source="ui", 191 details={"success": success}) 192 return {"success": success} 193 194 195@app.post("/api/tv/key") 196async def tv_key(body: dict): 197 key = body.get("key", "") 198 success = await samsung.send_key(key) 199 interaction_log.add("tv_command", f"Key {key}", source="ui", 200 details={"success": success}) 201 return {"success": success} 202 203 204@app.post("/api/tv/channel") 205async def tv_channel(body: dict): 206 channel = int(body.get("channel", 1)) 207 success = await samsung.set_channel(channel) 208 interaction_log.add("tv_command", f"Channel {channel}", source="ui", 209 details={"success": success}) 210 return {"success": success} 211 212 213@app.post("/api/tv/app") 214async def tv_launch_app(body: dict): 215 app_name = body.get("app", "") 216 success = await samsung.launch_app(app_name) 217 interaction_log.add("tv_command", f"Launch {app_name}", source="ui", 218 details={"success": success}) 219 return {"success": success} 220 221 222@app.websocket("/ws") 223async def ws_endpoint(websocket: WebSocket): 224 await websocket_endpoint(websocket, automation_engine=_automation_engine) 225 226 227if __name__ == "__main__": 228 cert_file = "/tmp/vision-agent-cert.pem" 229 key_file = "/tmp/vision-agent-key.pem" 230 231 if not os.path.exists(cert_file) or not os.path.exists(key_file): 232 print("Generating self-signed SSL certificate...") 233 try: 234 subprocess.run([ 235 "openssl", "req", "-x509", "-newkey", "rsa:2048", "-nodes", 236 "-out", cert_file, "-keyout", key_file, "-days", "365", 237 "-subj", "/CN=arduino-uno-q" 238 ], check=True, capture_output=True) 239 print("Certificate generated successfully.") 240 except Exception as e: 241 print(f"Error generating certificate: {e}") 242 cert_file = key_file = None 243 244 if cert_file and key_file: 245 uvicorn.run(app, host="0.0.0.0", port=5004, ssl_keyfile=key_file, ssl_certfile=cert_file) 246 else: 247 uvicorn.run(app, host="0.0.0.0", port=5004)
Arduino App Lab
q-ai-agent-v4
Q AI Agent V4 - Modular voice assistant with scheduled and sensor-triggered automations.
🤖
q-ai-agent-v4
Downloadable files
chip-tool
Chip tool to control matter devices
chip-tool.zip
Documentation
IMG_0177_compressed
IMG_0177_compressed.mov
Comments
Only logged in users can leave comments