|
- #
- # Copyright (c) 2011 Thomas Graf <tgraf@suug.ch>
- #
- """Module providing access to network addresses
- """
- from __future__ import absolute_import
- __version__ = '1.0'
- __all__ = [
- 'AddressCache',
- 'Address']
- import datetime
- from .. import core as netlink
- from . import capi as capi
- from . import link as Link
- from .. import util as util
- class AddressCache(netlink.Cache):
- """Cache containing network addresses"""
- def __init__(self, cache=None):
- if not cache:
- cache = self._alloc_cache_name('route/addr')
- self._protocol = netlink.NETLINK_ROUTE
- self._nl_cache = cache
- def __getitem__(self, key):
- # Using ifindex=0 here implies that the local address itself
- # is unique, otherwise the first occurence is returned.
- return self.lookup(0, key)
- def lookup(self, ifindex, local):
- if type(local) is str:
- local = netlink.AbstractAddress(local)
- addr = capi.rtnl_addr_get(self._nl_cache, ifindex,
- local._nl_addr)
- if addr is None:
- raise KeyError()
- return Address._from_capi(addr)
- @staticmethod
- def _new_object(obj):
- return Address(obj)
- @staticmethod
- def _new_cache(cache):
- return AddressCache(cache=cache)
- class Address(netlink.Object):
- """Network address"""
- def __init__(self, obj=None):
- netlink.Object.__init__(self, 'route/addr', 'address', obj)
- self._rtnl_addr = self._obj2type(self._nl_object)
- @classmethod
- def _from_capi(cls, obj):
- return cls(capi.addr2obj(obj))
- @staticmethod
- def _obj2type(obj):
- return capi.obj2addr(obj)
- def __cmp__(self, other):
- # sort by:
- # 1. network link
- # 2. address family
- # 3. local address (including prefixlen)
- diff = self.ifindex - other.ifindex
- if diff == 0:
- diff = self.family - other.family
- if diff == 0:
- diff = capi.nl_addr_cmp(self.local, other.local)
- return diff
- @staticmethod
- def _new_instance(obj):
- return Address(obj)
- @property
- @netlink.nlattr(type=int, immutable=True, fmt=util.num)
- def ifindex(self):
- """interface index"""
- return capi.rtnl_addr_get_ifindex(self._rtnl_addr)
- @ifindex.setter
- def ifindex(self, value):
- link = Link.resolve(value)
- if not link:
- raise ValueError()
- self.link = link
- @property
- @netlink.nlattr(type=str, fmt=util.string)
- def link(self):
- link = capi.rtnl_addr_get_link(self._rtnl_addr)
- if not link:
- return None
- return Link.Link.from_capi(link)
- @link.setter
- def link(self, value):
- if type(value) is str:
- try:
- value = Link.resolve(value)
- except KeyError:
- raise ValueError()
- capi.rtnl_addr_set_link(self._rtnl_addr, value._rtnl_link)
- # ifindex is immutable but we assume that if _orig does not
- # have an ifindex specified, it was meant to be given here
- if capi.rtnl_addr_get_ifindex(self._orig) == 0:
- capi.rtnl_addr_set_ifindex(self._orig, value.ifindex)
- @property
- @netlink.nlattr(type=str, fmt=util.string)
- def label(self):
- """address label"""
- return capi.rtnl_addr_get_label(self._rtnl_addr)
- @label.setter
- def label(self, value):
- capi.rtnl_addr_set_label(self._rtnl_addr, value)
- @property
- @netlink.nlattr(type=str, fmt=util.string)
- def flags(self):
- """Flags
- Setting this property will *Not* reset flags to value you supply in
- Examples:
- addr.flags = '+xxx' # add xxx flag
- addr.flags = 'xxx' # exactly the same
- addr.flags = '-xxx' # remove xxx flag
- addr.flags = [ '+xxx', '-yyy' ] # list operation
- """
- flags = capi.rtnl_addr_get_flags(self._rtnl_addr)
- return capi.rtnl_addr_flags2str(flags, 256)[0].split(',')
- def _set_flag(self, flag):
- if flag.startswith('-'):
- i = capi.rtnl_addr_str2flags(flag[1:])
- capi.rtnl_addr_unset_flags(self._rtnl_addr, i)
- elif flag.startswith('+'):
- i = capi.rtnl_addr_str2flags(flag[1:])
- capi.rtnl_addr_set_flags(self._rtnl_addr, i)
- else:
- i = capi.rtnl_addr_str2flags(flag)
- capi.rtnl_addr_set_flags(self._rtnl_addr, i)
- @flags.setter
- def flags(self, value):
- if type(value) is list:
- for flag in value:
- self._set_flag(flag)
- else:
- self._set_flag(value)
- @property
- @netlink.nlattr(type=int, immutable=True, fmt=util.num)
- def family(self):
- """Address family"""
- fam = capi.rtnl_addr_get_family(self._rtnl_addr)
- return netlink.AddressFamily(fam)
- @family.setter
- def family(self, value):
- if not isinstance(value, netlink.AddressFamily):
- value = netlink.AddressFamily(value)
- capi.rtnl_addr_set_family(self._rtnl_addr, int(value))
- @property
- @netlink.nlattr(type=int, fmt=util.num)
- def scope(self):
- """Address scope"""
- scope = capi.rtnl_addr_get_scope(self._rtnl_addr)
- return capi.rtnl_scope2str(scope, 32)[0]
- @scope.setter
- def scope(self, value):
- if type(value) is str:
- value = capi.rtnl_str2scope(value)
- capi.rtnl_addr_set_scope(self._rtnl_addr, value)
- @property
- @netlink.nlattr(type=str, immutable=True, fmt=util.addr)
- def local(self):
- """Local address"""
- a = capi.rtnl_addr_get_local(self._rtnl_addr)
- return netlink.AbstractAddress(a)
- @local.setter
- def local(self, value):
- a = netlink.AbstractAddress(value)
- capi.rtnl_addr_set_local(self._rtnl_addr, a._nl_addr)
- # local is immutable but we assume that if _orig does not
- # have a local address specified, it was meant to be given here
- if capi.rtnl_addr_get_local(self._orig) is None:
- capi.rtnl_addr_set_local(self._orig, a._nl_addr)
- @property
- @netlink.nlattr(type=str, fmt=util.addr)
- def peer(self):
- """Peer address"""
- a = capi.rtnl_addr_get_peer(self._rtnl_addr)
- return netlink.AbstractAddress(a)
- @peer.setter
- def peer(self, value):
- a = netlink.AbstractAddress(value)
- capi.rtnl_addr_set_peer(self._rtnl_addr, a._nl_addr)
- @property
- @netlink.nlattr(type=str, fmt=util.addr)
- def broadcast(self):
- """Broadcast address"""
- a = capi.rtnl_addr_get_broadcast(self._rtnl_addr)
- return netlink.AbstractAddress(a)
- @broadcast.setter
- def broadcast(self, value):
- a = netlink.AbstractAddress(value)
- capi.rtnl_addr_set_broadcast(self._rtnl_addr, a._nl_addr)
- @property
- @netlink.nlattr(type=str, fmt=util.addr)
- def multicast(self):
- """multicast address"""
- a = capi.rtnl_addr_get_multicast(self._rtnl_addr)
- return netlink.AbstractAddress(a)
- @multicast.setter
- def multicast(self, value):
- try:
- a = netlink.AbstractAddress(value)
- except ValueError as err:
- raise AttributeError('multicast', err)
- capi.rtnl_addr_set_multicast(self._rtnl_addr, a._nl_addr)
- @property
- @netlink.nlattr(type=str, fmt=util.addr)
- def anycast(self):
- """anycast address"""
- a = capi.rtnl_addr_get_anycast(self._rtnl_addr)
- return netlink.AbstractAddress(a)
- @anycast.setter
- def anycast(self, value):
- a = netlink.AbstractAddress(value)
- capi.rtnl_addr_set_anycast(self._rtnl_addr, a._nl_addr)
- @property
- @netlink.nlattr(type=int, immutable=True, fmt=util.num)
- def valid_lifetime(self):
- """Valid lifetime"""
- msecs = capi.rtnl_addr_get_valid_lifetime(self._rtnl_addr)
- if msecs == 0xFFFFFFFF:
- return None
- else:
- return datetime.timedelta(seconds=msecs)
- @valid_lifetime.setter
- def valid_lifetime(self, value):
- capi.rtnl_addr_set_valid_lifetime(self._rtnl_addr, int(value))
- @property
- @netlink.nlattr(type=int, immutable=True, fmt=util.num)
- def preferred_lifetime(self):
- """Preferred lifetime"""
- msecs = capi.rtnl_addr_get_preferred_lifetime(self._rtnl_addr)
- if msecs == 0xFFFFFFFF:
- return None
- else:
- return datetime.timedelta(seconds=msecs)
- @preferred_lifetime.setter
- def preferred_lifetime(self, value):
- capi.rtnl_addr_set_preferred_lifetime(self._rtnl_addr, int(value))
- @property
- @netlink.nlattr(type=int, immutable=True, fmt=util.num)
- def create_time(self):
- """Creation time"""
- hsec = capi.rtnl_addr_get_create_time(self._rtnl_addr)
- return datetime.timedelta(milliseconds=10*hsec)
- @property
- @netlink.nlattr(type=int, immutable=True, fmt=util.num)
- def last_update(self):
- """Last update"""
- hsec = capi.rtnl_addr_get_last_update_time(self._rtnl_addr)
- return datetime.timedelta(milliseconds=10*hsec)
- def add(self, socket=None, flags=None):
- if not socket:
- socket = netlink.lookup_socket(netlink.NETLINK_ROUTE)
- if not flags:
- flags = netlink.NLM_F_CREATE
- ret = capi.rtnl_addr_add(socket._sock, self._rtnl_addr, flags)
- if ret < 0:
- raise netlink.KernelError(ret)
- def delete(self, socket, flags=0):
- """Attempt to delete this address in the kernel"""
- ret = capi.rtnl_addr_delete(socket._sock, self._rtnl_addr, flags)
- if ret < 0:
- raise netlink.KernelError(ret)
- ###################################################################
- # private properties
- #
- # Used for formatting output. USE AT OWN RISK
- @property
- def _flags(self):
- return ','.join(self.flags)
- def format(self, details=False, stats=False, nodev=False, indent=''):
- """Return address as formatted text"""
- fmt = util.MyFormatter(self, indent)
- buf = fmt.format('{a|local!b}')
- if not nodev:
- buf += fmt.format(' {a|ifindex}')
- buf += fmt.format(' {a|scope}')
- if self.label:
- buf += fmt.format(' "{a|label}"')
- buf += fmt.format(' <{a|_flags}>')
- if details:
- buf += fmt.nl('\t{t|broadcast} {t|multicast}') \
- + fmt.nl('\t{t|peer} {t|anycast}')
- if self.valid_lifetime:
- buf += fmt.nl('\t{s|valid-lifetime!k} '\
- '{a|valid_lifetime}')
- if self.preferred_lifetime:
- buf += fmt.nl('\t{s|preferred-lifetime!k} '\
- '{a|preferred_lifetime}')
- if stats and (self.create_time or self.last_update):
- buf += self.nl('\t{s|created!k} {a|create_time}'\
- ' {s|last-updated!k} {a|last_update}')
- return buf
|