From 51f80c265a23eedadd9ae5564ca7eae1ce2350c1 Mon Sep 17 00:00:00 2001 From: Jeremy Fincher Date: Tue, 8 Apr 2003 07:04:57 +0000 Subject: [PATCH] Upgrade to asyncore from 2.3; removed fcntl dependent stuff (since I don't have it on my system). --- others/asyncore.py | 370 +++++++++++++++++++-------------------------- 1 file changed, 156 insertions(+), 214 deletions(-) diff --git a/others/asyncore.py b/others/asyncore.py index dac5392c0..6d1d48057 100644 --- a/others/asyncore.py +++ b/others/asyncore.py @@ -50,6 +50,7 @@ import exceptions import select import socket import sys +import time import os from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, \ @@ -60,100 +61,103 @@ try: except NameError: socket_map = {} -class ExitNow (exceptions.Exception): +class ExitNow(exceptions.Exception): pass -DEBUG = 0 +def read(obj): + try: + obj.handle_read_event() + except ExitNow: + raise + except: + obj.handle_error() -def poll (timeout=0.0, map=None): +def write(obj): + try: + obj.handle_write_event() + except ExitNow: + raise + except: + obj.handle_error() + +def readwrite(obj, flags): + try: + if flags & select.POLLIN: + obj.handle_read_event() + if flags & select.POLLOUT: + obj.handle_write_event() + except ExitNow: + raise + except: + obj.handle_error() + +def poll(timeout=0.0, map=None): if map is None: map = socket_map if map: r = []; w = []; e = [] - for fd, obj in map.iteritems(): + for fd, obj in map.items(): if obj.readable(): - r.append (fd) + r.append(fd) if obj.writable(): - w.append (fd) - try: - r,w,e = select.select (r,w,e, timeout) - except select.error, err: - if err[0] != EINTR: - raise - - if DEBUG: - print r,w,e + w.append(fd) + if [] == r == w == e: + time.sleep(timeout) + else: + try: + r, w, e = select.select(r, w, e, timeout) + except select.error, err: + if err[0] != EINTR: + raise + else: + return for fd in r: - try: - obj = map[fd] - except KeyError: + obj = map.get(fd) + if obj is None: continue - - try: - obj.handle_read_event() - except ExitNow: - raise ExitNow - except: - obj.handle_error() + read(obj) for fd in w: - try: - obj = map[fd] - except KeyError: + obj = map.get(fd) + if obj is None: continue + write(obj) - try: - obj.handle_write_event() - except ExitNow: - raise ExitNow - except: - obj.handle_error() - -def poll2 (timeout=0.0, map=None): +def poll2(timeout=0.0, map=None): import poll if map is None: - map=socket_map + map = socket_map if timeout is not None: # timeout is in milliseconds timeout = int(timeout*1000) if map: l = [] - for fd, obj in map.iteritems(): + for fd, obj in map.items(): flags = 0 if obj.readable(): flags = poll.POLLIN if obj.writable(): flags = flags | poll.POLLOUT if flags: - l.append ((fd, flags)) - r = poll.poll (l, timeout) + l.append((fd, flags)) + r = poll.poll(l, timeout) for fd, flags in r: - try: - obj = map[fd] - except KeyError: + obj = map.get(fd) + if obj is None: continue + readwrite(obj, flags) - try: - if (flags & poll.POLLIN): - obj.handle_read_event() - if (flags & poll.POLLOUT): - obj.handle_write_event() - except ExitNow: - raise ExitNow - except: - obj.handle_error() - -def poll3 (timeout=0.0, map=None): +def poll3(timeout=0.0, map=None): # Use the poll() support added to the select module in Python 2.0 if map is None: - map=socket_map + map = socket_map if timeout is not None: # timeout is in milliseconds timeout = int(timeout*1000) pollster = select.poll() if map: - for fd, obj in map.iteritems(): + for fd, obj in map.items(): flags = 0 if obj.readable(): flags = select.POLLIN @@ -162,34 +166,23 @@ def poll3 (timeout=0.0, map=None): if flags: pollster.register(fd, flags) try: - r = pollster.poll (timeout) + r = pollster.poll(timeout) except select.error, err: if err[0] != EINTR: raise r = [] for fd, flags in r: - try: - obj = map[fd] - except KeyError: + obj = map.get(fd) + if obj is None: continue + readwrite(obj, flags) - try: - if (flags & select.POLLIN): - obj.handle_read_event() - if (flags & select.POLLOUT): - obj.handle_write_event() - except ExitNow: - raise ExitNow - except: - obj.handle_error() - -def loop (timeout=30.0, use_poll=0, map=None): - +def loop(timeout=30.0, use_poll=0, map=None): if map is None: - map=socket_map + map = socket_map if use_poll: - if hasattr (select, 'poll'): + if hasattr(select, 'poll'): poll_fun = poll3 else: poll_fun = poll2 @@ -197,20 +190,21 @@ def loop (timeout=30.0, use_poll=0, map=None): poll_fun = poll while map: - poll_fun (timeout, map) + poll_fun(timeout, map) class dispatcher: + debug = 0 connected = 0 accepting = 0 closing = 0 addr = None - def __init__ (self, sock=None, map=None): + def __init__(self, sock=None, map=None): if sock: - self.set_socket (sock, map) + self.set_socket(sock, map) # I think it should inherit this anyway - self.socket.setblocking (0) + self.socket.setblocking(0) self.connected = 1 # XXX Does the constructor require that the socket passed # be connected? @@ -222,53 +216,53 @@ class dispatcher: else: self.socket = None - def __repr__ (self): + def __repr__(self): status = [self.__class__.__module__+"."+self.__class__.__name__] if self.accepting and self.addr: - status.append ('listening') + status.append('listening') elif self.connected: - status.append ('connected') + status.append('connected') if self.addr is not None: try: - status.append ('%s:%d' % self.addr) + status.append('%s:%d' % self.addr) except TypeError: - status.append (repr(self.addr)) - return '<%s at %#x>' % (' '.join (status), id (self)) + status.append(repr(self.addr)) + return '<%s at %#x>' % (' '.join(status), id(self)) - def add_channel (self, map=None): - #self.log_info ('adding channel %s' % self) + def add_channel(self, map=None): + #self.log_info('adding channel %s' % self) if map is None: - map=socket_map - map [self._fileno] = self + map = socket_map + map[self._fileno] = self - def del_channel (self, map=None): + def del_channel(self, map=None): fd = self._fileno if map is None: - map=socket_map - if map.has_key (fd): - #self.log_info ('closing channel %d:%s' % (fd, self)) - del map [fd] + map = socket_map + if map.has_key(fd): + #self.log_info('closing channel %d:%s' % (fd, self)) + del map[fd] - def create_socket (self, family, type): + def create_socket(self, family, type): self.family_and_type = family, type - self.socket = socket.socket (family, type) + self.socket = socket.socket(family, type) self.socket.setblocking(0) self._fileno = self.socket.fileno() self.add_channel() - def set_socket (self, sock, map=None): + def set_socket(self, sock, map=None): self.socket = sock ## self.__dict__['socket'] = sock self._fileno = sock.fileno() - self.add_channel (map) + self.add_channel(map) - def set_reuse_addr (self): + def set_reuse_addr(self): # try to re-use a server port if possible try: - self.socket.setsockopt ( + self.socket.setsockopt( socket.SOL_SOCKET, socket.SO_REUSEADDR, - self.socket.getsockopt (socket.SOL_SOCKET, - socket.SO_REUSEADDR) | 1 + self.socket.getsockopt(socket.SOL_SOCKET, + socket.SO_REUSEADDR) | 1 ) except socket.error: pass @@ -279,35 +273,36 @@ class dispatcher: # to pass to select(). # ================================================== - def readable (self): - return 1 + def readable(self): + return True if os.name == 'mac': # The macintosh will select a listening socket for # write if you let it. What might this mean? - def writable (self): + def writable(self): return not self.accepting else: - def writable (self): - return 1 + def writable(self): + return True # ================================================== # socket object methods. # ================================================== - def listen (self, num): + def listen(self, num): self.accepting = 1 if os.name == 'nt' and num > 5: num = 1 - return self.socket.listen (num) + return self.socket.listen(num) - def bind (self, addr): + def bind(self, addr): self.addr = addr - return self.socket.bind (addr) + return self.socket.bind(addr) - def connect (self, address): + def connect(self, address): self.connected = 0 err = self.socket.connect_ex(address) + # XXX Should interpret Winsock return values if err in (EINPROGRESS, EALREADY, EWOULDBLOCK): return if err in (0, EISCONN): @@ -317,7 +312,8 @@ class dispatcher: else: raise socket.error, err - def accept (self): + def accept(self): + # XXX can return either an address pair or None try: conn, addr = self.socket.accept() return conn, addr @@ -327,9 +323,9 @@ class dispatcher: else: raise socket.error, why - def send (self, data): + def send(self, data): try: - result = self.socket.send (data) + result = self.socket.send(data) return result except socket.error, why: if why[0] == EWOULDBLOCK: @@ -338,9 +334,9 @@ class dispatcher: raise socket.error, why return 0 - def recv (self, buffer_size): + def recv(self, buffer_size): try: - data = self.socket.recv (buffer_size) + data = self.socket.recv(buffer_size) if not data: # a closed connection is indicated by signaling # a read condition, and having recv() return 0. @@ -356,27 +352,27 @@ class dispatcher: else: raise socket.error, why - def close (self): + def close(self): self.del_channel() self.socket.close() # cheap inheritance, used to pass all other attribute # references to the underlying socket object. - def __getattr__ (self, attr): - return getattr (self.socket, attr) + def __getattr__(self, attr): + return getattr(self.socket, attr) - # log and log_info maybe overriden to provide more sophisitcated + # log and log_info may be overridden to provide more sophisticated # logging and warning methods. In general, log is for 'hit' logging # and 'log_info' is for informational, warning and error logging. - def log (self, message): - sys.stderr.write ('log: %s\n' % str(message)) + def log(self, message): + sys.stderr.write('log: %s\n' % str(message)) - def log_info (self, message, type='info'): + def log_info(self, message, type='info'): if __debug__ or type != 'info': print '%s: %s' % (type, message) - def handle_read_event (self): + def handle_read_event(self): if self.accepting: # for an accepting socket, getting a read implies # that we are connected @@ -390,26 +386,26 @@ class dispatcher: else: self.handle_read() - def handle_write_event (self): + def handle_write_event(self): # getting a write implies that we are connected if not self.connected: self.handle_connect() self.connected = 1 self.handle_write() - def handle_expt_event (self): + def handle_expt_event(self): self.handle_expt() - def handle_error (self): + def handle_error(self): nil, t, v, tbinfo = compact_traceback() # sometimes a user repr method will crash. try: - self_repr = repr (self) + self_repr = repr(self) except: - self_repr = '<__repr__ (self) failed for object at %0x>' % id(self) + self_repr = '<__repr__(self) failed for object at %0x>' % id(self) - self.log_info ( + self.log_info( 'uncaptured python exception, closing channel %s (%s:%s %s)' % ( self_repr, t, @@ -420,23 +416,23 @@ class dispatcher: ) self.close() - def handle_expt (self): - self.log_info ('unhandled exception', 'warning') + def handle_expt(self): + self.log_info('unhandled exception', 'warning') - def handle_read (self): - self.log_info ('unhandled read event', 'warning') + def handle_read(self): + self.log_info('unhandled read event', 'warning') - def handle_write (self): - self.log_info ('unhandled write event', 'warning') + def handle_write(self): + self.log_info('unhandled write event', 'warning') - def handle_connect (self): - self.log_info ('unhandled connect event', 'warning') + def handle_connect(self): + self.log_info('unhandled connect event', 'warning') - def handle_accept (self): - self.log_info ('unhandled accept event', 'warning') + def handle_accept(self): + self.log_info('unhandled accept event', 'warning') - def handle_close (self): - self.log_info ('unhandled close event', 'warning') + def handle_close(self): + self.log_info('unhandled close event', 'warning') self.close() # --------------------------------------------------------------------------- @@ -444,25 +440,26 @@ class dispatcher: # [for more sophisticated usage use asynchat.async_chat] # --------------------------------------------------------------------------- -class dispatcher_with_send (dispatcher): - def __init__ (self, sock=None): - dispatcher.__init__ (self, sock) +class dispatcher_with_send(dispatcher): + + def __init__(self, sock=None): + dispatcher.__init__(self, sock) self.out_buffer = '' - def initiate_send (self): + def initiate_send(self): num_sent = 0 - num_sent = dispatcher.send (self, self.out_buffer[:512]) + num_sent = dispatcher.send(self, self.out_buffer[:512]) self.out_buffer = self.out_buffer[num_sent:] - def handle_write (self): + def handle_write(self): self.initiate_send() - def writable (self): + def writable(self): return (not self.connected) or len(self.out_buffer) - def send (self, data): + def send(self, data): if self.debug: - self.log_info ('sending %s' % repr(data)) + self.log_info('sending %s' % repr(data)) self.out_buffer = self.out_buffer + data self.initiate_send() @@ -470,83 +467,28 @@ class dispatcher_with_send (dispatcher): # used for debugging. # --------------------------------------------------------------------------- -def compact_traceback (): - t,v,tb = sys.exc_info() +def compact_traceback(): + t, v, tb = sys.exc_info() tbinfo = [] - while 1: - tbinfo.append (( + assert tb # Must have a traceback + while tb: + tbinfo.append(( tb.tb_frame.f_code.co_filename, tb.tb_frame.f_code.co_name, str(tb.tb_lineno) )) tb = tb.tb_next - if not tb: - break # just to be safe del tb file, function, line = tbinfo[-1] - info = '[' + '] ['.join(map(lambda x: '|'.join(x), tbinfo)) + ']' + info = ' '.join(['[%s|%s|%s]' % x for x in tbinfo]) return (file, function, line), t, v, info -def close_all (map=None): +def close_all(map=None): if map is None: - map=socket_map - for x in map.itervalues(): + map = socket_map + for x in map.values(): x.socket.close() map.clear() - -# Asynchronous File I/O: -# -# After a little research (reading man pages on various unixen, and -# digging through the linux kernel), I've determined that select() -# isn't meant for doing doing asynchronous file i/o. -# Heartening, though - reading linux/mm/filemap.c shows that linux -# supports asynchronous read-ahead. So _MOST_ of the time, the data -# will be sitting in memory for us already when we go to read it. -# -# What other OS's (besides NT) support async file i/o? [VMS?] -# -# Regardless, this is useful for pipes, and stdin/stdout... - -import os -if os.name == 'posix': - import fcntl - - class file_wrapper: - # here we override just enough to make a file - # look like a socket for the purposes of asyncore. - def __init__ (self, fd): - self.fd = fd - - def recv (self, *args): - return apply (os.read, (self.fd,)+args) - - def send (self, *args): - return apply (os.write, (self.fd,)+args) - - read = recv - write = send - - def close (self): - return os.close (self.fd) - - def fileno (self): - return self.fd - - class file_dispatcher (dispatcher): - def __init__ (self, fd): - dispatcher.__init__ (self) - self.connected = 1 - # set it to non-blocking mode - flags = fcntl.fcntl (fd, fcntl.F_GETFL, 0) - flags = flags | os.O_NONBLOCK - fcntl.fcntl (fd, fcntl.F_SETFL, flags) - self.set_file (fd) - - def set_file (self, fd): - self._fileno = fd - self.socket = file_wrapper (fd) - self.add_channel() -# vim:set shiftwidth=4 tabstop=8 expandtab textwidth=78: