asyncore.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659
  1. # -*- Mode: Python -*-
  2. # Id: asyncore.py,v 2.51 2000/09/07 22:29:26 rushing Exp
  3. # Author: Sam Rushing <rushing@nightmare.com>
  4. # ======================================================================
  5. # Copyright 1996 by Sam Rushing
  6. #
  7. # All Rights Reserved
  8. #
  9. # Permission to use, copy, modify, and distribute this software and
  10. # its documentation for any purpose and without fee is hereby
  11. # granted, provided that the above copyright notice appear in all
  12. # copies and that both that copyright notice and this permission
  13. # notice appear in supporting documentation, and that the name of Sam
  14. # Rushing not be used in advertising or publicity pertaining to
  15. # distribution of the software without specific, written prior
  16. # permission.
  17. #
  18. # SAM RUSHING DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE,
  19. # INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN
  20. # NO EVENT SHALL SAM RUSHING BE LIABLE FOR ANY SPECIAL, INDIRECT OR
  21. # CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS
  22. # OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
  23. # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
  24. # CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  25. # ======================================================================
  26. """Basic infrastructure for asynchronous socket service clients and servers.
  27. There are only two ways to have a program on a single processor do "more
  28. than one thing at a time". Multi-threaded programming is the simplest and
  29. most popular way to do it, but there is another very different technique,
  30. that lets you have nearly all the advantages of multi-threading, without
  31. actually using multiple threads. it's really only practical if your program
  32. is largely I/O bound. If your program is CPU bound, then pre-emptive
  33. scheduled threads are probably what you really need. Network servers are
  34. rarely CPU-bound, however.
  35. If your operating system supports the select() system call in its I/O
  36. library (and nearly all do), then you can use it to juggle multiple
  37. communication channels at once; doing other work while your I/O is taking
  38. place in the "background." Although this strategy can seem strange and
  39. complex, especially at first, it is in many ways easier to understand and
  40. control than multi-threaded programming. The module documented here solves
  41. many of the difficult problems for you, making the task of building
  42. sophisticated high-performance network servers and clients a snap.
  43. """
  44. import select
  45. import socket
  46. import sys
  47. import time
  48. import warnings
  49. import os
  50. from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, EINVAL, \
  51. ENOTCONN, ESHUTDOWN, EINTR, EISCONN, EBADF, ECONNABORTED, EPIPE, EAGAIN, \
  52. errorcode
  53. _DISCONNECTED = frozenset((ECONNRESET, ENOTCONN, ESHUTDOWN, ECONNABORTED, EPIPE,
  54. EBADF))
  55. try:
  56. socket_map
  57. except NameError:
  58. socket_map = {}
  59. def _strerror(err):
  60. try:
  61. return os.strerror(err)
  62. except (ValueError, OverflowError, NameError):
  63. if err in errorcode:
  64. return errorcode[err]
  65. return "Unknown error %s" %err
  66. class ExitNow(Exception):
  67. pass
  68. _reraised_exceptions = (ExitNow, KeyboardInterrupt, SystemExit)
  69. def read(obj):
  70. try:
  71. obj.handle_read_event()
  72. except _reraised_exceptions:
  73. raise
  74. except:
  75. obj.handle_error()
  76. def write(obj):
  77. try:
  78. obj.handle_write_event()
  79. except _reraised_exceptions:
  80. raise
  81. except:
  82. obj.handle_error()
  83. def _exception(obj):
  84. try:
  85. obj.handle_expt_event()
  86. except _reraised_exceptions:
  87. raise
  88. except:
  89. obj.handle_error()
  90. def readwrite(obj, flags):
  91. try:
  92. if flags & select.POLLIN:
  93. obj.handle_read_event()
  94. if flags & select.POLLOUT:
  95. obj.handle_write_event()
  96. if flags & select.POLLPRI:
  97. obj.handle_expt_event()
  98. if flags & (select.POLLHUP | select.POLLERR | select.POLLNVAL):
  99. obj.handle_close()
  100. except socket.error, e:
  101. if e.args[0] not in _DISCONNECTED:
  102. obj.handle_error()
  103. else:
  104. obj.handle_close()
  105. except _reraised_exceptions:
  106. raise
  107. except:
  108. obj.handle_error()
  109. def poll(timeout=0.0, map=None):
  110. if map is None:
  111. map = socket_map
  112. if map:
  113. r = []; w = []; e = []
  114. for fd, obj in map.items():
  115. is_r = obj.readable()
  116. is_w = obj.writable()
  117. if is_r:
  118. r.append(fd)
  119. # accepting sockets should not be writable
  120. if is_w and not obj.accepting:
  121. w.append(fd)
  122. if is_r or is_w:
  123. e.append(fd)
  124. if [] == r == w == e:
  125. time.sleep(timeout)
  126. return
  127. try:
  128. r, w, e = select.select(r, w, e, timeout)
  129. except select.error, err:
  130. if err.args[0] != EINTR:
  131. raise
  132. else:
  133. return
  134. for fd in r:
  135. obj = map.get(fd)
  136. if obj is None:
  137. continue
  138. read(obj)
  139. for fd in w:
  140. obj = map.get(fd)
  141. if obj is None:
  142. continue
  143. write(obj)
  144. for fd in e:
  145. obj = map.get(fd)
  146. if obj is None:
  147. continue
  148. _exception(obj)
  149. def poll2(timeout=0.0, map=None):
  150. # Use the poll() support added to the select module in Python 2.0
  151. if map is None:
  152. map = socket_map
  153. if timeout is not None:
  154. # timeout is in milliseconds
  155. timeout = int(timeout*1000)
  156. pollster = select.poll()
  157. if map:
  158. for fd, obj in map.items():
  159. flags = 0
  160. if obj.readable():
  161. flags |= select.POLLIN | select.POLLPRI
  162. # accepting sockets should not be writable
  163. if obj.writable() and not obj.accepting:
  164. flags |= select.POLLOUT
  165. if flags:
  166. # Only check for exceptions if object was either readable
  167. # or writable.
  168. flags |= select.POLLERR | select.POLLHUP | select.POLLNVAL
  169. pollster.register(fd, flags)
  170. try:
  171. r = pollster.poll(timeout)
  172. except select.error, err:
  173. if err.args[0] != EINTR:
  174. raise
  175. r = []
  176. for fd, flags in r:
  177. obj = map.get(fd)
  178. if obj is None:
  179. continue
  180. readwrite(obj, flags)
  181. poll3 = poll2 # Alias for backward compatibility
  182. def loop(timeout=30.0, use_poll=False, map=None, count=None):
  183. if map is None:
  184. map = socket_map
  185. if use_poll and hasattr(select, 'poll'):
  186. poll_fun = poll2
  187. else:
  188. poll_fun = poll
  189. if count is None:
  190. while map:
  191. poll_fun(timeout, map)
  192. else:
  193. while map and count > 0:
  194. poll_fun(timeout, map)
  195. count = count - 1
  196. class dispatcher:
  197. debug = False
  198. connected = False
  199. accepting = False
  200. connecting = False
  201. closing = False
  202. addr = None
  203. ignore_log_types = frozenset(['warning'])
  204. def __init__(self, sock=None, map=None):
  205. if map is None:
  206. self._map = socket_map
  207. else:
  208. self._map = map
  209. self._fileno = None
  210. if sock:
  211. # Set to nonblocking just to make sure for cases where we
  212. # get a socket from a blocking source.
  213. sock.setblocking(0)
  214. self.set_socket(sock, map)
  215. self.connected = True
  216. # The constructor no longer requires that the socket
  217. # passed be connected.
  218. try:
  219. self.addr = sock.getpeername()
  220. except socket.error, err:
  221. if err.args[0] in (ENOTCONN, EINVAL):
  222. # To handle the case where we got an unconnected
  223. # socket.
  224. self.connected = False
  225. else:
  226. # The socket is broken in some unknown way, alert
  227. # the user and remove it from the map (to prevent
  228. # polling of broken sockets).
  229. self.del_channel(map)
  230. raise
  231. else:
  232. self.socket = None
  233. def __repr__(self):
  234. status = [self.__class__.__module__+"."+self.__class__.__name__]
  235. if self.accepting and self.addr:
  236. status.append('listening')
  237. elif self.connected:
  238. status.append('connected')
  239. if self.addr is not None:
  240. try:
  241. status.append('%s:%d' % self.addr)
  242. except TypeError:
  243. status.append(repr(self.addr))
  244. return '<%s at %#x>' % (' '.join(status), id(self))
  245. __str__ = __repr__
  246. def add_channel(self, map=None):
  247. #self.log_info('adding channel %s' % self)
  248. if map is None:
  249. map = self._map
  250. map[self._fileno] = self
  251. def del_channel(self, map=None):
  252. fd = self._fileno
  253. if map is None:
  254. map = self._map
  255. if fd in map:
  256. #self.log_info('closing channel %d:%s' % (fd, self))
  257. del map[fd]
  258. self._fileno = None
  259. def create_socket(self, family, type):
  260. self.family_and_type = family, type
  261. sock = socket.socket(family, type)
  262. sock.setblocking(0)
  263. self.set_socket(sock)
  264. def set_socket(self, sock, map=None):
  265. self.socket = sock
  266. ## self.__dict__['socket'] = sock
  267. self._fileno = sock.fileno()
  268. self.add_channel(map)
  269. def set_reuse_addr(self):
  270. # try to re-use a server port if possible
  271. try:
  272. self.socket.setsockopt(
  273. socket.SOL_SOCKET, socket.SO_REUSEADDR,
  274. self.socket.getsockopt(socket.SOL_SOCKET,
  275. socket.SO_REUSEADDR) | 1
  276. )
  277. except socket.error:
  278. pass
  279. # ==================================================
  280. # predicates for select()
  281. # these are used as filters for the lists of sockets
  282. # to pass to select().
  283. # ==================================================
  284. def readable(self):
  285. return True
  286. def writable(self):
  287. return True
  288. # ==================================================
  289. # socket object methods.
  290. # ==================================================
  291. def listen(self, num):
  292. self.accepting = True
  293. if os.name == 'nt' and num > 5:
  294. num = 5
  295. return self.socket.listen(num)
  296. def bind(self, addr):
  297. self.addr = addr
  298. return self.socket.bind(addr)
  299. def connect(self, address):
  300. self.connected = False
  301. self.connecting = True
  302. err = self.socket.connect_ex(address)
  303. if err in (EINPROGRESS, EALREADY, EWOULDBLOCK) \
  304. or err == EINVAL and os.name in ('nt', 'ce'):
  305. self.addr = address
  306. return
  307. if err in (0, EISCONN):
  308. self.addr = address
  309. self.handle_connect_event()
  310. else:
  311. raise socket.error(err, errorcode[err])
  312. def accept(self):
  313. # XXX can return either an address pair or None
  314. try:
  315. conn, addr = self.socket.accept()
  316. except TypeError:
  317. return None
  318. except socket.error as why:
  319. if why.args[0] in (EWOULDBLOCK, ECONNABORTED, EAGAIN):
  320. return None
  321. else:
  322. raise
  323. else:
  324. return conn, addr
  325. def send(self, data):
  326. try:
  327. result = self.socket.send(data)
  328. return result
  329. except socket.error, why:
  330. if why.args[0] == EWOULDBLOCK:
  331. return 0
  332. elif why.args[0] in _DISCONNECTED:
  333. self.handle_close()
  334. return 0
  335. else:
  336. raise
  337. def recv(self, buffer_size):
  338. try:
  339. data = self.socket.recv(buffer_size)
  340. if not data:
  341. # a closed connection is indicated by signaling
  342. # a read condition, and having recv() return 0.
  343. self.handle_close()
  344. return ''
  345. else:
  346. return data
  347. except socket.error, why:
  348. # winsock sometimes raises ENOTCONN
  349. if why.args[0] in _DISCONNECTED:
  350. self.handle_close()
  351. return ''
  352. else:
  353. raise
  354. def close(self):
  355. self.connected = False
  356. self.accepting = False
  357. self.connecting = False
  358. self.del_channel()
  359. try:
  360. self.socket.close()
  361. except socket.error, why:
  362. if why.args[0] not in (ENOTCONN, EBADF):
  363. raise
  364. # cheap inheritance, used to pass all other attribute
  365. # references to the underlying socket object.
  366. def __getattr__(self, attr):
  367. try:
  368. retattr = getattr(self.socket, attr)
  369. except AttributeError:
  370. raise AttributeError("%s instance has no attribute '%s'"
  371. %(self.__class__.__name__, attr))
  372. else:
  373. msg = "%(me)s.%(attr)s is deprecated. Use %(me)s.socket.%(attr)s " \
  374. "instead." % {'me': self.__class__.__name__, 'attr':attr}
  375. warnings.warn(msg, DeprecationWarning, stacklevel=2)
  376. return retattr
  377. # log and log_info may be overridden to provide more sophisticated
  378. # logging and warning methods. In general, log is for 'hit' logging
  379. # and 'log_info' is for informational, warning and error logging.
  380. def log(self, message):
  381. sys.stderr.write('log: %s\n' % str(message))
  382. def log_info(self, message, type='info'):
  383. if type not in self.ignore_log_types:
  384. print '%s: %s' % (type, message)
  385. def handle_read_event(self):
  386. if self.accepting:
  387. # accepting sockets are never connected, they "spawn" new
  388. # sockets that are connected
  389. self.handle_accept()
  390. elif not self.connected:
  391. if self.connecting:
  392. self.handle_connect_event()
  393. self.handle_read()
  394. else:
  395. self.handle_read()
  396. def handle_connect_event(self):
  397. err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
  398. if err != 0:
  399. raise socket.error(err, _strerror(err))
  400. self.handle_connect()
  401. self.connected = True
  402. self.connecting = False
  403. def handle_write_event(self):
  404. if self.accepting:
  405. # Accepting sockets shouldn't get a write event.
  406. # We will pretend it didn't happen.
  407. return
  408. if not self.connected:
  409. if self.connecting:
  410. self.handle_connect_event()
  411. self.handle_write()
  412. def handle_expt_event(self):
  413. # handle_expt_event() is called if there might be an error on the
  414. # socket, or if there is OOB data
  415. # check for the error condition first
  416. err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
  417. if err != 0:
  418. # we can get here when select.select() says that there is an
  419. # exceptional condition on the socket
  420. # since there is an error, we'll go ahead and close the socket
  421. # like we would in a subclassed handle_read() that received no
  422. # data
  423. self.handle_close()
  424. else:
  425. self.handle_expt()
  426. def handle_error(self):
  427. nil, t, v, tbinfo = compact_traceback()
  428. # sometimes a user repr method will crash.
  429. try:
  430. self_repr = repr(self)
  431. except:
  432. self_repr = '<__repr__(self) failed for object at %0x>' % id(self)
  433. self.log_info(
  434. 'uncaptured python exception, closing channel %s (%s:%s %s)' % (
  435. self_repr,
  436. t,
  437. v,
  438. tbinfo
  439. ),
  440. 'error'
  441. )
  442. self.handle_close()
  443. def handle_expt(self):
  444. self.log_info('unhandled incoming priority event', 'warning')
  445. def handle_read(self):
  446. self.log_info('unhandled read event', 'warning')
  447. def handle_write(self):
  448. self.log_info('unhandled write event', 'warning')
  449. def handle_connect(self):
  450. self.log_info('unhandled connect event', 'warning')
  451. def handle_accept(self):
  452. self.log_info('unhandled accept event', 'warning')
  453. def handle_close(self):
  454. self.log_info('unhandled close event', 'warning')
  455. self.close()
  456. # ---------------------------------------------------------------------------
  457. # adds simple buffered output capability, useful for simple clients.
  458. # [for more sophisticated usage use asynchat.async_chat]
  459. # ---------------------------------------------------------------------------
  460. class dispatcher_with_send(dispatcher):
  461. def __init__(self, sock=None, map=None):
  462. dispatcher.__init__(self, sock, map)
  463. self.out_buffer = ''
  464. def initiate_send(self):
  465. num_sent = 0
  466. num_sent = dispatcher.send(self, self.out_buffer[:512])
  467. self.out_buffer = self.out_buffer[num_sent:]
  468. def handle_write(self):
  469. self.initiate_send()
  470. def writable(self):
  471. return (not self.connected) or len(self.out_buffer)
  472. def send(self, data):
  473. if self.debug:
  474. self.log_info('sending %s' % repr(data))
  475. self.out_buffer = self.out_buffer + data
  476. self.initiate_send()
  477. # ---------------------------------------------------------------------------
  478. # used for debugging.
  479. # ---------------------------------------------------------------------------
  480. def compact_traceback():
  481. t, v, tb = sys.exc_info()
  482. tbinfo = []
  483. if not tb: # Must have a traceback
  484. raise AssertionError("traceback does not exist")
  485. while tb:
  486. tbinfo.append((
  487. tb.tb_frame.f_code.co_filename,
  488. tb.tb_frame.f_code.co_name,
  489. str(tb.tb_lineno)
  490. ))
  491. tb = tb.tb_next
  492. # just to be safe
  493. del tb
  494. file, function, line = tbinfo[-1]
  495. info = ' '.join(['[%s|%s|%s]' % x for x in tbinfo])
  496. return (file, function, line), t, v, info
  497. def close_all(map=None, ignore_all=False):
  498. if map is None:
  499. map = socket_map
  500. for x in map.values():
  501. try:
  502. x.close()
  503. except OSError, x:
  504. if x.args[0] == EBADF:
  505. pass
  506. elif not ignore_all:
  507. raise
  508. except _reraised_exceptions:
  509. raise
  510. except:
  511. if not ignore_all:
  512. raise
  513. map.clear()
  514. # Asynchronous File I/O:
  515. #
  516. # After a little research (reading man pages on various unixen, and
  517. # digging through the linux kernel), I've determined that select()
  518. # isn't meant for doing asynchronous file i/o.
  519. # Heartening, though - reading linux/mm/filemap.c shows that linux
  520. # supports asynchronous read-ahead. So _MOST_ of the time, the data
  521. # will be sitting in memory for us already when we go to read it.
  522. #
  523. # What other OS's (besides NT) support async file i/o? [VMS?]
  524. #
  525. # Regardless, this is useful for pipes, and stdin/stdout...
  526. if os.name == 'posix':
  527. import fcntl
  528. class file_wrapper:
  529. # Here we override just enough to make a file
  530. # look like a socket for the purposes of asyncore.
  531. # The passed fd is automatically os.dup()'d
  532. def __init__(self, fd):
  533. self.fd = os.dup(fd)
  534. def recv(self, *args):
  535. return os.read(self.fd, *args)
  536. def send(self, *args):
  537. return os.write(self.fd, *args)
  538. def getsockopt(self, level, optname, buflen=None):
  539. if (level == socket.SOL_SOCKET and
  540. optname == socket.SO_ERROR and
  541. not buflen):
  542. return 0
  543. raise NotImplementedError("Only asyncore specific behaviour "
  544. "implemented.")
  545. read = recv
  546. write = send
  547. def close(self):
  548. os.close(self.fd)
  549. def fileno(self):
  550. return self.fd
  551. class file_dispatcher(dispatcher):
  552. def __init__(self, fd, map=None):
  553. dispatcher.__init__(self, None, map)
  554. self.connected = True
  555. try:
  556. fd = fd.fileno()
  557. except AttributeError:
  558. pass
  559. self.set_file(fd)
  560. # set it to non-blocking mode
  561. flags = fcntl.fcntl(fd, fcntl.F_GETFL, 0)
  562. flags = flags | os.O_NONBLOCK
  563. fcntl.fcntl(fd, fcntl.F_SETFL, flags)
  564. def set_file(self, fd):
  565. self.socket = file_wrapper(fd)
  566. self._fileno = self.socket.fileno()
  567. self.add_channel()