#!/usr/bin/python3 # SPDX-License-Identifier: LGPL-2.1-or-later from __future__ import absolute_import, print_function, unicode_literals import os import signal import syslog import dbus import dbus.service import dbus.mainloop.glib try: from gi.repository import GLib except ImportError: import gobject as GLib from functools import partial # The ENV VAR if set should be of the form 'hci0', 'hci1' etc. DEVICE_ENV_VAR = 'BLUETOOTH_ADAPTER' DEVICE_PATH_BASE = '/org/bluez/' DEVICE_DEFAULT = 'hci0' AGENT_INTERFACE = 'org.bluez.Agent1' AGENT_PATH = "/org/bluez/promiscuousAgent" def set_trusted(path): props = dbus.Interface(bus.get_object("org.bluez", path), "org.freedesktop.DBus.Properties") props.Set("org.bluez.Device1", "Trusted", True) class Rejected(dbus.DBusException): _dbus_error_name = "org.bluez.Error.Rejected" class Agent(dbus.service.Object): exit_on_release = True def set_exit_on_release(self, exit_on_release): self.exit_on_release = exit_on_release @dbus.service.method(AGENT_INTERFACE, in_signature="", out_signature="") def Release(self): syslog.syslog("Release") if self.exit_on_release: mainloop.quit() @dbus.service.method(AGENT_INTERFACE, in_signature="os", out_signature="") def AuthorizeService(self, device, uuid): print(f"AuthorizeService ({device}, {uuid})") if uuid == "0000110B-0000-1000-8000-00805F9B34FB": print("Authorized A2DP Service") return syslog.syslog("Rejecting non-A2DP Service") raise Rejected("Connection rejected") @dbus.service.method(AGENT_INTERFACE, in_signature="ou", out_signature="") def RequestConfirmation(self, device, passkey): print(f"RequestConfirmation ({device}, {passkey})") print("Auto confirming...") @dbus.service.method(AGENT_INTERFACE, in_signature="o", out_signature="s") def RequestPinCode(self, device): print(f"RequestPinCode '0000' from {device}") set_trusted(device) return "0000" @dbus.service.method(AGENT_INTERFACE, in_signature="", out_signature="") def Cancel(self): print("Cancel") def quit(manager, mloop): manager.UnregisterAgent(AGENT_PATH) print("\nAgent unregistered") mloop.quit() if __name__ == '__main__': dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) bus = dbus.SystemBus() capability = "DisplayYesNo" agent = Agent(bus, AGENT_PATH) mainloop = GLib.MainLoop() obj = bus.get_object("org.bluez", "/org/bluez") # Get the device from ENV_VAR if set adapters=os.getenv(DEVICE_ENV_VAR, DEVICE_DEFAULT).split(",") for adapter in adapters: adapterPath = DEVICE_PATH_BASE + adapter # Set Discoverable and Pairable to always on print(f"Setting {adapterPath} to 'discoverable' and 'pairable'...") prop = dbus.Interface(bus.get_object("org.bluez", adapterPath), "org.freedesktop.DBus.Properties") prop.Set("org.bluez.Adapter1", "DiscoverableTimeout", dbus.UInt32(0)) prop.Set("org.bluez.Adapter1", "PairableTimeout", dbus.UInt32(0)) prop.Set("org.bluez.Adapter1", "Discoverable", dbus.Boolean(True)) prop.Set("org.bluez.Adapter1", "Pairable", dbus.Boolean(True)) # Create the agent manager manager = dbus.Interface(obj, "org.bluez.AgentManager1") manager.RegisterAgent(AGENT_PATH, capability) manager.RequestDefaultAgent(AGENT_PATH) print("Agent registered") # Ensure that ctrl+c is caught properly ## Assign the 'quit' function to a variable mquit = partial(quit, manager=manager, mloop=mainloop) GLib.unix_signal_add(GLib.PRIORITY_DEFAULT, signal.SIGINT, mquit) mainloop.run()