D7net
Home
Console
Upload
information
Create File
Create Folder
About
Tools
:
/
proc
/
self
/
root
/
proc
/
self
/
root
/
lib64
/
python3.6
/
site-packages
/
dbus
/
Filename :
bus.py
back
Copy
# Copyright (C) 2007 Collabora Ltd. <http://www.collabora.co.uk/> # # Permission is hereby granted, free of charge, to any person # obtaining a copy of this software and associated documentation # files (the "Software"), to deal in the Software without # restriction, including without limitation the rights to use, copy, # modify, merge, publish, distribute, sublicense, and/or sell copies # of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be # included in all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT # HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER # DEALINGS IN THE SOFTWARE. __all__ = ('BusConnection',) __docformat__ = 'reStructuredText' import logging import weakref from _dbus_bindings import ( BUS_DAEMON_IFACE, BUS_DAEMON_NAME, BUS_DAEMON_PATH, BUS_SESSION, BUS_STARTER, BUS_SYSTEM, DBUS_START_REPLY_ALREADY_RUNNING, DBUS_START_REPLY_SUCCESS, NAME_FLAG_ALLOW_REPLACEMENT, NAME_FLAG_DO_NOT_QUEUE, NAME_FLAG_REPLACE_EXISTING, RELEASE_NAME_REPLY_NON_EXISTENT, RELEASE_NAME_REPLY_NOT_OWNER, RELEASE_NAME_REPLY_RELEASED, REQUEST_NAME_REPLY_ALREADY_OWNER, REQUEST_NAME_REPLY_EXISTS, REQUEST_NAME_REPLY_IN_QUEUE, REQUEST_NAME_REPLY_PRIMARY_OWNER, validate_bus_name, validate_error_name, validate_interface_name, validate_member_name, validate_object_path) from dbus.connection import Connection from dbus.exceptions import DBusException from dbus.lowlevel import HANDLER_RESULT_NOT_YET_HANDLED from dbus._compat import is_py2 _NAME_OWNER_CHANGE_MATCH = ("type='signal',sender='%s'," "interface='%s',member='NameOwnerChanged'," "path='%s',arg0='%%s'" % (BUS_DAEMON_NAME, BUS_DAEMON_IFACE, BUS_DAEMON_PATH)) """(_NAME_OWNER_CHANGE_MATCH % sender) matches relevant NameOwnerChange messages""" _NAME_HAS_NO_OWNER = 'org.freedesktop.DBus.Error.NameHasNoOwner' _logger = logging.getLogger('dbus.bus') class NameOwnerWatch(object): __slots__ = ('_match', '_pending_call') def __init__(self, bus_conn, bus_name, callback): validate_bus_name(bus_name) def signal_cb(owned, old_owner, new_owner): callback(new_owner) def error_cb(e): if e.get_dbus_name() == _NAME_HAS_NO_OWNER: callback('') else: logging.basicConfig() _logger.debug('GetNameOwner(%s) failed:', bus_name, exc_info=(e.__class__, e, None)) self._match = bus_conn.add_signal_receiver(signal_cb, 'NameOwnerChanged', BUS_DAEMON_IFACE, BUS_DAEMON_NAME, BUS_DAEMON_PATH, arg0=bus_name) keywords = {} if is_py2: keywords['utf8_strings'] = True self._pending_call = bus_conn.call_async(BUS_DAEMON_NAME, BUS_DAEMON_PATH, BUS_DAEMON_IFACE, 'GetNameOwner', 's', (bus_name,), callback, error_cb, **keywords) def cancel(self): if self._match is not None: self._match.remove() if self._pending_call is not None: self._pending_call.cancel() self._match = None self._pending_call = None class BusConnection(Connection): """A connection to a D-Bus daemon that implements the ``org.freedesktop.DBus`` pseudo-service. :Since: 0.81.0 """ TYPE_SESSION = BUS_SESSION """Represents a session bus (same as the global dbus.BUS_SESSION)""" TYPE_SYSTEM = BUS_SYSTEM """Represents the system bus (same as the global dbus.BUS_SYSTEM)""" TYPE_STARTER = BUS_STARTER """Represents the bus that started this service by activation (same as the global dbus.BUS_STARTER)""" START_REPLY_SUCCESS = DBUS_START_REPLY_SUCCESS START_REPLY_ALREADY_RUNNING = DBUS_START_REPLY_ALREADY_RUNNING def __new__(cls, address_or_type=TYPE_SESSION, mainloop=None): bus = cls._new_for_bus(address_or_type, mainloop=mainloop) # _bus_names is used by dbus.service.BusName! bus._bus_names = weakref.WeakValueDictionary() bus._signal_sender_matches = {} """Map from SignalMatch to NameOwnerWatch.""" return bus def add_signal_receiver(self, handler_function, signal_name=None, dbus_interface=None, bus_name=None, path=None, **keywords): named_service = keywords.pop('named_service', None) if named_service is not None: if bus_name is not None: raise TypeError('bus_name and named_service cannot both be ' 'specified') bus_name = named_service from warnings import warn warn('Passing the named_service parameter to add_signal_receiver ' 'by name is deprecated: please use positional parameters', DeprecationWarning, stacklevel=2) match = super(BusConnection, self).add_signal_receiver( handler_function, signal_name, dbus_interface, bus_name, path, **keywords) if (bus_name is not None and bus_name != BUS_DAEMON_NAME): if bus_name[:1] == ':': def callback(new_owner): if new_owner == '': match.remove() else: callback = match.set_sender_name_owner watch = self.watch_name_owner(bus_name, callback) self._signal_sender_matches[match] = watch self.add_match_string(str(match)) return match def _clean_up_signal_match(self, match): # The signals lock is no longer held here (it was in <= 0.81.0) self.remove_match_string_non_blocking(str(match)) watch = self._signal_sender_matches.pop(match, None) if watch is not None: watch.cancel() def activate_name_owner(self, bus_name): if (bus_name is not None and bus_name[:1] != ':' and bus_name != BUS_DAEMON_NAME): try: return self.get_name_owner(bus_name) except DBusException as e: if e.get_dbus_name() != _NAME_HAS_NO_OWNER: raise # else it doesn't exist: try to start it self.start_service_by_name(bus_name) return self.get_name_owner(bus_name) else: # already unique return bus_name def get_object(self, bus_name, object_path, introspect=True, follow_name_owner_changes=False, **kwargs): """Return a local proxy for the given remote object. Method calls on the proxy are translated into method calls on the remote object. :Parameters: `bus_name` : str A bus name (either the unique name or a well-known name) of the application owning the object. The keyword argument named_service is a deprecated alias for this. `object_path` : str The object path of the desired object `introspect` : bool If true (default), attempt to introspect the remote object to find out supported methods and their signatures `follow_name_owner_changes` : bool If the object path is a well-known name and this parameter is false (default), resolve the well-known name to the unique name of its current owner and bind to that instead; if the ownership of the well-known name changes in future, keep communicating with the original owner. This is necessary if the D-Bus API used is stateful. If the object path is a well-known name and this parameter is true, whenever the well-known name changes ownership in future, bind to the new owner, if any. If the given object path is a unique name, this parameter has no effect. :Returns: a `dbus.proxies.ProxyObject` :Raises `DBusException`: if resolving the well-known name to a unique name fails """ if follow_name_owner_changes: self._require_main_loop() # we don't get the signals otherwise named_service = kwargs.pop('named_service', None) if named_service is not None: if bus_name is not None: raise TypeError('bus_name and named_service cannot both ' 'be specified') from warnings import warn warn('Passing the named_service parameter to get_object by name ' 'is deprecated: please use positional parameters', DeprecationWarning, stacklevel=2) bus_name = named_service if kwargs: raise TypeError('get_object does not take these keyword ' 'arguments: %s' % ', '.join(kwargs.keys())) return self.ProxyObjectClass(self, bus_name, object_path, introspect=introspect, follow_name_owner_changes=follow_name_owner_changes) def get_unix_user(self, bus_name): """Get the numeric uid of the process owning the given bus name. :Parameters: `bus_name` : str A bus name, either unique or well-known :Returns: a `dbus.UInt32` :Since: 0.80.0 """ validate_bus_name(bus_name) return self.call_blocking(BUS_DAEMON_NAME, BUS_DAEMON_PATH, BUS_DAEMON_IFACE, 'GetConnectionUnixUser', 's', (bus_name,)) def start_service_by_name(self, bus_name, flags=0): """Start a service which will implement the given bus name on this Bus. :Parameters: `bus_name` : str The well-known bus name to be activated. `flags` : dbus.UInt32 Flags to pass to StartServiceByName (currently none are defined) :Returns: A tuple of 2 elements. The first is always True, the second is either START_REPLY_SUCCESS or START_REPLY_ALREADY_RUNNING. :Raises `DBusException`: if the service could not be started. :Since: 0.80.0 """ validate_bus_name(bus_name) return (True, self.call_blocking(BUS_DAEMON_NAME, BUS_DAEMON_PATH, BUS_DAEMON_IFACE, 'StartServiceByName', 'su', (bus_name, flags))) # XXX: it might be nice to signal IN_QUEUE, EXISTS by exception, # but this would not be backwards-compatible def request_name(self, name, flags=0): """Request a bus name. :Parameters: `name` : str The well-known name to be requested `flags` : dbus.UInt32 A bitwise-OR of 0 or more of the flags `NAME_FLAG_ALLOW_REPLACEMENT`, `NAME_FLAG_REPLACE_EXISTING` and `NAME_FLAG_DO_NOT_QUEUE` :Returns: `REQUEST_NAME_REPLY_PRIMARY_OWNER`, `REQUEST_NAME_REPLY_IN_QUEUE`, `REQUEST_NAME_REPLY_EXISTS` or `REQUEST_NAME_REPLY_ALREADY_OWNER` :Raises `DBusException`: if the bus daemon cannot be contacted or returns an error. """ validate_bus_name(name, allow_unique=False) return self.call_blocking(BUS_DAEMON_NAME, BUS_DAEMON_PATH, BUS_DAEMON_IFACE, 'RequestName', 'su', (name, flags)) def release_name(self, name): """Release a bus name. :Parameters: `name` : str The well-known name to be released :Returns: `RELEASE_NAME_REPLY_RELEASED`, `RELEASE_NAME_REPLY_NON_EXISTENT` or `RELEASE_NAME_REPLY_NOT_OWNER` :Raises `DBusException`: if the bus daemon cannot be contacted or returns an error. """ validate_bus_name(name, allow_unique=False) return self.call_blocking(BUS_DAEMON_NAME, BUS_DAEMON_PATH, BUS_DAEMON_IFACE, 'ReleaseName', 's', (name,)) def list_names(self): """Return a list of all currently-owned names on the bus. :Returns: a dbus.Array of dbus.UTF8String :Since: 0.81.0 """ keywords = {} if is_py2: keywords['utf8_strings'] = True return self.call_blocking(BUS_DAEMON_NAME, BUS_DAEMON_PATH, BUS_DAEMON_IFACE, 'ListNames', '', (), **keywords) def list_activatable_names(self): """Return a list of all names that can be activated on the bus. :Returns: a dbus.Array of dbus.UTF8String :Since: 0.81.0 """ keywords = {} if is_py2: keywords['utf8_strings'] = True return self.call_blocking(BUS_DAEMON_NAME, BUS_DAEMON_PATH, BUS_DAEMON_IFACE, 'ListActivatableNames', '', (), **keywords) def get_name_owner(self, bus_name): """Return the unique connection name of the primary owner of the given name. :Raises `DBusException`: if the `bus_name` has no owner :Since: 0.81.0 """ keywords = {} if is_py2: keywords['utf8_strings'] = True validate_bus_name(bus_name, allow_unique=False) return self.call_blocking(BUS_DAEMON_NAME, BUS_DAEMON_PATH, BUS_DAEMON_IFACE, 'GetNameOwner', 's', (bus_name,), **keywords) def watch_name_owner(self, bus_name, callback): """Watch the unique connection name of the primary owner of the given name. `callback` will be called with one argument, which is either the unique connection name, or the empty string (meaning the name is not owned). :Since: 0.81.0 """ return NameOwnerWatch(self, bus_name, callback) def name_has_owner(self, bus_name): """Return True iff the given bus name has an owner on this bus. :Parameters: `bus_name` : str The bus name to look up :Returns: a `bool` """ return bool(self.call_blocking(BUS_DAEMON_NAME, BUS_DAEMON_PATH, BUS_DAEMON_IFACE, 'NameHasOwner', 's', (bus_name,))) def add_match_string(self, rule): """Arrange for this application to receive messages on the bus that match the given rule. This version will block. :Parameters: `rule` : str The match rule :Raises `DBusException`: on error. :Since: 0.80.0 """ self.call_blocking(BUS_DAEMON_NAME, BUS_DAEMON_PATH, BUS_DAEMON_IFACE, 'AddMatch', 's', (rule,)) # FIXME: add an async success/error handler capability? # (and the same for remove_...) def add_match_string_non_blocking(self, rule): """Arrange for this application to receive messages on the bus that match the given rule. This version will not block, but any errors will be ignored. :Parameters: `rule` : str The match rule :Raises `DBusException`: on error. :Since: 0.80.0 """ self.call_async(BUS_DAEMON_NAME, BUS_DAEMON_PATH, BUS_DAEMON_IFACE, 'AddMatch', 's', (rule,), None, None) def remove_match_string(self, rule): """Arrange for this application to receive messages on the bus that match the given rule. This version will block. :Parameters: `rule` : str The match rule :Raises `DBusException`: on error. :Since: 0.80.0 """ self.call_blocking(BUS_DAEMON_NAME, BUS_DAEMON_PATH, BUS_DAEMON_IFACE, 'RemoveMatch', 's', (rule,)) def remove_match_string_non_blocking(self, rule): """Arrange for this application to receive messages on the bus that match the given rule. This version will not block, but any errors will be ignored. :Parameters: `rule` : str The match rule :Raises `DBusException`: on error. :Since: 0.80.0 """ self.call_async(BUS_DAEMON_NAME, BUS_DAEMON_PATH, BUS_DAEMON_IFACE, 'RemoveMatch', 's', (rule,), None, None)