#!/usr/bin/python3 # # a2dp-agent runs as daemon started by /etc/rc.d/rc.bluez-alsa # This daemon listens to bluetooth dbus messages. # Heavily based on existing scripts. # # Version 1.0 03-11-2025 import subprocess import signal import syslog import sys import dbus import dbus.service import dbus.mainloop.glib try: from gi.repository import GLib except ImportError: import gobject as GLib from functools import partial AGENT_INTERFACE = "org.bluez.Agent1" AGENT_PATH = "/test/agent" Status_Connected = 0 Status_Authorized = 0 Status_Hook = 0 def device_property_changed_cb(iface, changed_props, invalidated_props, path=None, interface=None): if iface != "org.bluez.Device1": return global Status_Connected global Status_Authorized global Status_Hook device = dbus.Interface(bus.get_object("org.bluez", path), "org.freedesktop.DBus.Properties") Status_Connected = device.Get("org.bluez.Device1", "Connected") syslog.syslog("device_property_changed_cb") if Status_Connected == 0: Status_Authorized = 0 if Status_Hook == 1: subprocess.run("a2dp-hook stop", shell=True, check=True) Status_Hook = 0 syslog.syslog("Connected {}, Authorized {}, Hook {}".format(Status_Connected, Status_Authorized, Status_Hook)) class Rejected(dbus.DBusException): _dbus_error_name = "org.bluez.Error.Rejected" class Agent(dbus.service.Object): exit_on_release = True def set_exit_on_release(self, exit_on_release): self.exit_on_release = exit_on_release @dbus.service.method(AGENT_INTERFACE, in_signature="", out_signature="") def Release(self): syslog.syslog("Release") if self.exit_on_release: mainloop.quit() @dbus.service.method(AGENT_INTERFACE, in_signature="os", out_signature="") def AuthorizeService(self, device, uuid): syslog.syslog("AuthorizeService ({0:s}, {1:s})".format(device, uuid)) global Status_Connected global Status_Authorized global Status_Hook if uuid == "0000110d-0000-1000-8000-00805f9b34fb" and Status_Connected == 1: syslog.syslog("Authorized A2DP Service") Status_Authorized = 1 if Status_Hook == 0: subprocess.run("a2dp-hook start", shell=True, check=True) # syslog.syslog("Running cmd: {0:s}".format(cmd)) Status_Hook = 1 syslog.syslog("Connected {}, Authorized {}, Hook {}".format(Status_Connected, Status_Authorized, Status_Hook)) return syslog.syslog("Rejecting non-A2DP Service") raise Rejected("Connection rejected") @dbus.service.method(AGENT_INTERFACE, in_signature="os", out_signature="") def DisplayPinCode(self, device, pincode): syslog.syslog("DisplayPinCode ({0:s}, {1:s})".format(device, pincode)) @dbus.service.method(AGENT_INTERFACE, in_signature="ou", out_signature="") def RequestConfirmation(self, device, passkey): syslog.syslog("RequestConfirmation ({0:s}, {1:06d})".format(device, passkey)) return @dbus.service.method(AGENT_INTERFACE, in_signature="o", out_signature="s") def RequestPinCode(self, device): syslog.syslog("RequestPinCode '0000' from {1:s}".format(device)) set_trusted(device) return "0000" @dbus.service.method(AGENT_INTERFACE, in_signature="o", out_signature="") def RequestAuthorization(self, device): syslog.syslog("RequestAuthorization ({0:s})".format(device)) raise Rejected("Pairing rejected") @dbus.service.method(AGENT_INTERFACE, in_signature="", out_signature="") def Cancel(self): syslog.syslog("Cancel") def quit(manager, mloop): manager.UnregisterAgent(AGENT_PATH) syslog.syslog("Agent unregistered") mloop.quit() if __name__ == '__main__': dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) bus = dbus.SystemBus() # listen for signals on the Bluez bus bus.add_signal_receiver( device_property_changed_cb, bus_name = "org.bluez", signal_name = "PropertiesChanged", path_keyword = "path", interface_keyword = "interface") capability = "DisplayYesNo" agent = Agent(bus, AGENT_PATH) mainloop = GLib.MainLoop() obj = bus.get_object("org.bluez", "/org/bluez") # Create the agent manager manager = dbus.Interface(obj, "org.bluez.AgentManager1") manager.RegisterAgent(AGENT_PATH, capability) manager.RequestDefaultAgent(AGENT_PATH) syslog.openlog(ident="a2dp-agent",logoption=syslog.LOG_PID, facility=syslog.LOG_USER) syslog.syslog('A2DP Agent Registered') # Ensure that ctrl+c is caught properly ## Assign the 'quit' function to a variable mquit = partial(quit, manager=manager, mloop=mainloop) GLib.unix_signal_add(GLib.PRIORITY_DEFAULT, signal.SIGINT, mquit) mainloop.run()