|
- #
- # Copyright (c) 2011 Thomas Graf <tgraf@suug.ch>
- #
- """Module providing access to network links
- This module provides an interface to view configured network links,
- modify them and to add and delete virtual network links.
- The following is a basic example:
- import netlink.core as netlink
- import netlink.route.link as link
- sock = netlink.Socket()
- sock.connect(netlink.NETLINK_ROUTE)
- cache = link.LinkCache() # create new empty link cache
- cache.refill(sock) # fill cache with all configured links
- eth0 = cache['eth0'] # lookup link "eth0"
- print eth0 # print basic configuration
- The module contains the following public classes:
- - Link -- Represents a network link. Instances can be created directly
- via the constructor (empty link objects) or via the refill()
- method of a LinkCache.
- - LinkCache -- Derived from netlink.Cache, holds any number of
- network links (Link instances). Main purpose is to keep
- a local list of all network links configured in the
- kernel.
- The following public functions exist:
- - get_from_kernel(socket, name)
- """
- from __future__ import absolute_import
- __version__ = '0.1'
- __all__ = [
- 'LinkCache',
- 'Link',
- 'get_from_kernel',
- ]
- import socket
- from .. import core as netlink
- from .. import capi as core_capi
- from . import capi as capi
- from .links import inet as inet
- from .. import util as util
- # Link statistics definitions
- RX_PACKETS = 0
- TX_PACKETS = 1
- RX_BYTES = 2
- TX_BYTES = 3
- RX_ERRORS = 4
- TX_ERRORS = 5
- RX_DROPPED = 6
- TX_DROPPED = 7
- RX_COMPRESSED = 8
- TX_COMPRESSED = 9
- RX_FIFO_ERR = 10
- TX_FIFO_ERR = 11
- RX_LEN_ERR = 12
- RX_OVER_ERR = 13
- RX_CRC_ERR = 14
- RX_FRAME_ERR = 15
- RX_MISSED_ERR = 16
- TX_ABORT_ERR = 17
- TX_CARRIER_ERR = 18
- TX_HBEAT_ERR = 19
- TX_WIN_ERR = 20
- COLLISIONS = 21
- MULTICAST = 22
- IP6_INPKTS = 23
- IP6_INHDRERRORS = 24
- IP6_INTOOBIGERRORS = 25
- IP6_INNOROUTES = 26
- IP6_INADDRERRORS = 27
- IP6_INUNKNOWNPROTOS = 28
- IP6_INTRUNCATEDPKTS = 29
- IP6_INDISCARDS = 30
- IP6_INDELIVERS = 31
- IP6_OUTFORWDATAGRAMS = 32
- IP6_OUTPKTS = 33
- IP6_OUTDISCARDS = 34
- IP6_OUTNOROUTES = 35
- IP6_REASMTIMEOUT = 36
- IP6_REASMREQDS = 37
- IP6_REASMOKS = 38
- IP6_REASMFAILS = 39
- IP6_FRAGOKS = 40
- IP6_FRAGFAILS = 41
- IP6_FRAGCREATES = 42
- IP6_INMCASTPKTS = 43
- IP6_OUTMCASTPKTS = 44
- IP6_INBCASTPKTS = 45
- IP6_OUTBCASTPKTS = 46
- IP6_INOCTETS = 47
- IP6_OUTOCTETS = 48
- IP6_INMCASTOCTETS = 49
- IP6_OUTMCASTOCTETS = 50
- IP6_INBCASTOCTETS = 51
- IP6_OUTBCASTOCTETS = 52
- ICMP6_INMSGS = 53
- ICMP6_INERRORS = 54
- ICMP6_OUTMSGS = 55
- ICMP6_OUTERRORS = 56
- class LinkCache(netlink.Cache):
- """Cache of network links"""
- def __init__(self, family=socket.AF_UNSPEC, cache=None):
- if not cache:
- cache = self._alloc_cache_name('route/link')
- self._info_module = None
- self._protocol = netlink.NETLINK_ROUTE
- self._nl_cache = cache
- self._set_arg1(family)
- def __getitem__(self, key):
- if type(key) is int:
- link = capi.rtnl_link_get(self._nl_cache, key)
- else:
- link = capi.rtnl_link_get_by_name(self._nl_cache, key)
- if link is None:
- raise KeyError()
- else:
- return Link.from_capi(link)
- @staticmethod
- def _new_object(obj):
- return Link(obj)
- def _new_cache(self, cache):
- return LinkCache(family=self.arg1, cache=cache)
- class Link(netlink.Object):
- """Network link"""
- def __init__(self, obj=None):
- netlink.Object.__init__(self, 'route/link', 'link', obj)
- self._rtnl_link = self._obj2type(self._nl_object)
- if self.type:
- self._module_lookup('netlink.route.links.' + self.type)
- self.inet = inet.InetLink(self)
- self.af = {'inet' : self.inet }
- def __enter__(self):
- return self
- def __exit__(self, exc_type, exc_value, tb):
- if exc_type is None:
- self.change()
- else:
- return false
- @classmethod
- def from_capi(cls, obj):
- return cls(capi.link2obj(obj))
- @staticmethod
- def _obj2type(obj):
- return capi.obj2link(obj)
- def __cmp__(self, other):
- return self.ifindex - other.ifindex
- @staticmethod
- def _new_instance(obj):
- if not obj:
- raise ValueError()
- return Link(obj)
- @property
- @netlink.nlattr(type=int, immutable=True, fmt=util.num)
- def ifindex(self):
- """interface index"""
- return capi.rtnl_link_get_ifindex(self._rtnl_link)
- @ifindex.setter
- def ifindex(self, value):
- capi.rtnl_link_set_ifindex(self._rtnl_link, int(value))
- # ifindex is immutable but we assume that if _orig does not
- # have an ifindex specified, it was meant to be given here
- if capi.rtnl_link_get_ifindex(self._orig) == 0:
- capi.rtnl_link_set_ifindex(self._orig, int(value))
- @property
- @netlink.nlattr(type=str, fmt=util.bold)
- def name(self):
- """Name of link"""
- return capi.rtnl_link_get_name(self._rtnl_link)
- @name.setter
- def name(self, value):
- capi.rtnl_link_set_name(self._rtnl_link, value)
- # name is the secondary identifier, if _orig does not have
- # the name specified yet, assume it was meant to be specified
- # here. ifindex will always take priority, therefore if ifindex
- # is specified as well, this will be ignored automatically.
- if capi.rtnl_link_get_name(self._orig) is None:
- capi.rtnl_link_set_name(self._orig, value)
- @property
- @netlink.nlattr(type=str, fmt=util.string)
- def flags(self):
- """Flags
- Setting this property will *Not* reset flags to value you supply in
- Examples:
- link.flags = '+xxx' # add xxx flag
- link.flags = 'xxx' # exactly the same
- link.flags = '-xxx' # remove xxx flag
- link.flags = [ '+xxx', '-yyy' ] # list operation
- """
- flags = capi.rtnl_link_get_flags(self._rtnl_link)
- return capi.rtnl_link_flags2str(flags, 256)[0].split(',')
- def _set_flag(self, flag):
- if flag.startswith('-'):
- i = capi.rtnl_link_str2flags(flag[1:])
- capi.rtnl_link_unset_flags(self._rtnl_link, i)
- elif flag.startswith('+'):
- i = capi.rtnl_link_str2flags(flag[1:])
- capi.rtnl_link_set_flags(self._rtnl_link, i)
- else:
- i = capi.rtnl_link_str2flags(flag)
- capi.rtnl_link_set_flags(self._rtnl_link, i)
- @flags.setter
- def flags(self, value):
- if not (type(value) is str):
- for flag in value:
- self._set_flag(flag)
- else:
- self._set_flag(value)
- @property
- @netlink.nlattr(type=int, fmt=util.num)
- def mtu(self):
- """Maximum Transmission Unit"""
- return capi.rtnl_link_get_mtu(self._rtnl_link)
- @mtu.setter
- def mtu(self, value):
- capi.rtnl_link_set_mtu(self._rtnl_link, int(value))
- @property
- @netlink.nlattr(type=int, immutable=True, fmt=util.num)
- def family(self):
- """Address family"""
- return capi.rtnl_link_get_family(self._rtnl_link)
- @family.setter
- def family(self, value):
- capi.rtnl_link_set_family(self._rtnl_link, value)
- @property
- @netlink.nlattr(type=str, fmt=util.addr)
- def address(self):
- """Hardware address (MAC address)"""
- a = capi.rtnl_link_get_addr(self._rtnl_link)
- return netlink.AbstractAddress(a)
- @address.setter
- def address(self, value):
- capi.rtnl_link_set_addr(self._rtnl_link, value._addr)
- @property
- @netlink.nlattr(type=str, fmt=util.addr)
- def broadcast(self):
- """Hardware broadcast address"""
- a = capi.rtnl_link_get_broadcast(self._rtnl_link)
- return netlink.AbstractAddress(a)
- @broadcast.setter
- def broadcast(self, value):
- capi.rtnl_link_set_broadcast(self._rtnl_link, value._addr)
- @property
- @netlink.nlattr(type=str, immutable=True, fmt=util.string)
- def qdisc(self):
- """Name of qdisc (cannot be changed)"""
- return capi.rtnl_link_get_qdisc(self._rtnl_link)
- @qdisc.setter
- def qdisc(self, value):
- capi.rtnl_link_set_qdisc(self._rtnl_link, value)
- @property
- @netlink.nlattr(type=int, fmt=util.num)
- def txqlen(self):
- """Length of transmit queue"""
- return capi.rtnl_link_get_txqlen(self._rtnl_link)
- @txqlen.setter
- def txqlen(self, value):
- capi.rtnl_link_set_txqlen(self._rtnl_link, int(value))
- @property
- @netlink.nlattr(type=str, immutable=True, fmt=util.string)
- def arptype(self):
- """Type of link (cannot be changed)"""
- type_ = capi.rtnl_link_get_arptype(self._rtnl_link)
- return core_capi.nl_llproto2str(type_, 64)[0]
- @arptype.setter
- def arptype(self, value):
- i = core_capi.nl_str2llproto(value)
- capi.rtnl_link_set_arptype(self._rtnl_link, i)
- @property
- @netlink.nlattr(type=str, immutable=True, fmt=util.string, title='state')
- def operstate(self):
- """Operational status"""
- operstate = capi.rtnl_link_get_operstate(self._rtnl_link)
- return capi.rtnl_link_operstate2str(operstate, 32)[0]
- @operstate.setter
- def operstate(self, value):
- i = capi.rtnl_link_str2operstate(value)
- capi.rtnl_link_set_operstate(self._rtnl_link, i)
- @property
- @netlink.nlattr(type=str, immutable=True, fmt=util.string)
- def mode(self):
- """Link mode"""
- mode = capi.rtnl_link_get_linkmode(self._rtnl_link)
- return capi.rtnl_link_mode2str(mode, 32)[0]
- @mode.setter
- def mode(self, value):
- i = capi.rtnl_link_str2mode(value)
- capi.rtnl_link_set_linkmode(self._rtnl_link, i)
- @property
- @netlink.nlattr(type=str, fmt=util.string)
- def alias(self):
- """Interface alias (SNMP)"""
- return capi.rtnl_link_get_ifalias(self._rtnl_link)
- @alias.setter
- def alias(self, value):
- capi.rtnl_link_set_ifalias(self._rtnl_link, value)
- @property
- @netlink.nlattr(type=str, fmt=util.string)
- def type(self):
- """Link type"""
- return capi.rtnl_link_get_type(self._rtnl_link)
- @type.setter
- def type(self, value):
- if capi.rtnl_link_set_type(self._rtnl_link, value) < 0:
- raise NameError('unknown info type')
- self._module_lookup('netlink.route.links.' + value)
- def get_stat(self, stat):
- """Retrieve statistical information"""
- if type(stat) is str:
- stat = capi.rtnl_link_str2stat(stat)
- if stat < 0:
- raise NameError('unknown name of statistic')
- return capi.rtnl_link_get_stat(self._rtnl_link, stat)
- def enslave(self, slave, sock=None):
- if not sock:
- sock = netlink.lookup_socket(netlink.NETLINK_ROUTE)
- return capi.rtnl_link_enslave(sock._sock, self._rtnl_link, slave._rtnl_link)
- def release(self, slave, sock=None):
- if not sock:
- sock = netlink.lookup_socket(netlink.NETLINK_ROUTE)
- return capi.rtnl_link_release(sock._sock, self._rtnl_link, slave._rtnl_link)
- def add(self, sock=None, flags=None):
- if not sock:
- sock = netlink.lookup_socket(netlink.NETLINK_ROUTE)
- if not flags:
- flags = netlink.NLM_F_CREATE
- ret = capi.rtnl_link_add(sock._sock, self._rtnl_link, flags)
- if ret < 0:
- raise netlink.KernelError(ret)
- def change(self, sock=None, flags=0):
- """Commit changes made to the link object"""
- if sock is None:
- sock = netlink.lookup_socket(netlink.NETLINK_ROUTE)
- if not self._orig:
- raise netlink.NetlinkError('Original link not available')
- ret = capi.rtnl_link_change(sock._sock, self._orig, self._rtnl_link, flags)
- if ret < 0:
- raise netlink.KernelError(ret)
- def delete(self, sock=None):
- """Attempt to delete this link in the kernel"""
- if sock is None:
- sock = netlink.lookup_socket(netlink.NETLINK_ROUTE)
- ret = capi.rtnl_link_delete(sock._sock, self._rtnl_link)
- if ret < 0:
- raise netlink.KernelError(ret)
- ###################################################################
- # private properties
- #
- # Used for formatting output. USE AT OWN RISK
- @property
- def _state(self):
- if 'up' in self.flags:
- buf = util.good('up')
- if 'lowerup' not in self.flags:
- buf += ' ' + util.bad('no-carrier')
- else:
- buf = util.bad('down')
- return buf
- @property
- def _brief(self):
- return self._module_brief() + self._foreach_af('brief')
- @property
- def _flags(self):
- ignore = [
- 'up',
- 'running',
- 'lowerup',
- ]
- return ','.join([flag for flag in self.flags if flag not in ignore])
- def _foreach_af(self, name, args=None):
- buf = ''
- for af in self.af:
- try:
- func = getattr(self.af[af], name)
- s = str(func(args))
- if len(s) > 0:
- buf += ' ' + s
- except AttributeError:
- pass
- return buf
- def format(self, details=False, stats=False, indent=''):
- """Return link as formatted text"""
- fmt = util.MyFormatter(self, indent)
- buf = fmt.format('{a|ifindex} {a|name} {a|arptype} {a|address} '\
- '{a|_state} <{a|_flags}> {a|_brief}')
- if details:
- buf += fmt.nl('\t{t|mtu} {t|txqlen} {t|weight} '\
- '{t|qdisc} {t|operstate}')
- buf += fmt.nl('\t{t|broadcast} {t|alias}')
- buf += self._foreach_af('details', fmt)
- if stats:
- l = [['Packets', RX_PACKETS, TX_PACKETS],
- ['Bytes', RX_BYTES, TX_BYTES],
- ['Errors', RX_ERRORS, TX_ERRORS],
- ['Dropped', RX_DROPPED, TX_DROPPED],
- ['Compressed', RX_COMPRESSED, TX_COMPRESSED],
- ['FIFO Errors', RX_FIFO_ERR, TX_FIFO_ERR],
- ['Length Errors', RX_LEN_ERR, None],
- ['Over Errors', RX_OVER_ERR, None],
- ['CRC Errors', RX_CRC_ERR, None],
- ['Frame Errors', RX_FRAME_ERR, None],
- ['Missed Errors', RX_MISSED_ERR, None],
- ['Abort Errors', None, TX_ABORT_ERR],
- ['Carrier Errors', None, TX_CARRIER_ERR],
- ['Heartbeat Errors', None, TX_HBEAT_ERR],
- ['Window Errors', None, TX_WIN_ERR],
- ['Collisions', None, COLLISIONS],
- ['Multicast', None, MULTICAST],
- ['', None, None],
- ['Ipv6:', None, None],
- ['Packets', IP6_INPKTS, IP6_OUTPKTS],
- ['Bytes', IP6_INOCTETS, IP6_OUTOCTETS],
- ['Discards', IP6_INDISCARDS, IP6_OUTDISCARDS],
- ['Multicast Packets', IP6_INMCASTPKTS, IP6_OUTMCASTPKTS],
- ['Multicast Bytes', IP6_INMCASTOCTETS, IP6_OUTMCASTOCTETS],
- ['Broadcast Packets', IP6_INBCASTPKTS, IP6_OUTBCASTPKTS],
- ['Broadcast Bytes', IP6_INBCASTOCTETS, IP6_OUTBCASTOCTETS],
- ['Delivers', IP6_INDELIVERS, None],
- ['Forwarded', None, IP6_OUTFORWDATAGRAMS],
- ['No Routes', IP6_INNOROUTES, IP6_OUTNOROUTES],
- ['Header Errors', IP6_INHDRERRORS, None],
- ['Too Big Errors', IP6_INTOOBIGERRORS, None],
- ['Address Errors', IP6_INADDRERRORS, None],
- ['Unknown Protocol', IP6_INUNKNOWNPROTOS, None],
- ['Truncated Packets', IP6_INTRUNCATEDPKTS, None],
- ['Reasm Timeouts', IP6_REASMTIMEOUT, None],
- ['Reasm Requests', IP6_REASMREQDS, None],
- ['Reasm Failures', IP6_REASMFAILS, None],
- ['Reasm OK', IP6_REASMOKS, None],
- ['Frag Created', None, IP6_FRAGCREATES],
- ['Frag Failures', None, IP6_FRAGFAILS],
- ['Frag OK', None, IP6_FRAGOKS],
- ['', None, None],
- ['ICMPv6:', None, None],
- ['Messages', ICMP6_INMSGS, ICMP6_OUTMSGS],
- ['Errors', ICMP6_INERRORS, ICMP6_OUTERRORS]]
- buf += '\n\t%s%s%s%s\n' % (33 * ' ', util.title('RX'),
- 15 * ' ', util.title('TX'))
- for row in l:
- row[0] = util.kw(row[0])
- row[1] = self.get_stat(row[1]) if row[1] else ''
- row[2] = self.get_stat(row[2]) if row[2] else ''
- buf += '\t{0[0]:27} {0[1]:>16} {0[2]:>16}\n'.format(row)
- buf += self._foreach_af('stats')
- return buf
- def get(name, sock=None):
- """Lookup Link object directly from kernel"""
- if not name:
- raise ValueError()
- if not sock:
- sock = netlink.lookup_socket(netlink.NETLINK_ROUTE)
- link = capi.get_from_kernel(sock._sock, 0, name)
- if not link:
- return None
- return Link.from_capi(link)
- _link_cache = LinkCache()
- def resolve(name):
- _link_cache.refill()
- return _link_cache[name]
|