Index: squid-tagger.py ================================================================== --- squid-tagger.py +++ squid-tagger.py @@ -137,106 +137,90 @@ # tiny wrapper around a file to make reads from it geventable # or should i move this somewhere? class FReadlineQueue(gevent.queue.Queue): - # storing file descriptor, leftover - __slots__ = frozenset(['_io', '_fileno', '_tail']) + # storing fileno descriptor, leftover + __slots__ = frozenset(['_fn', '_tail']) - def __init__(self, fd, closefd = True): - import io + def __init__(self, fd): # initialising class gevent.queue.Queue.__init__(self) - # storing file descriptor - self._fileno = fd.fileno() - self._io = io.FileIO(self._fileno, 'r', closefd) + self._fn = fd.fileno() # using empty tail self._tail = '' - # setting up event - self._install_wait() - - def _install_wait(self): # putting file to nonblocking mode - fcntl.fcntl(self._fileno, fcntl.F_SETFL, fcntl.fcntl(self._fileno, fcntl.F_GETFL) | os.O_NONBLOCK) - # installing event handler - gevent.core.read_event(self._fileno, self._wait_helper) - - def _wait_helper(self, ev, evtype): - # reading one buffer from stream - buf = self._io.read(4096) - # splitting stream by line ends - rows = buf.decode('l1').split('\n') - # adding tail to the first element if there is some tail - if len(self._tail) > 0: - rows[0] = self._tail + rows[0] - # popping out last (incomplete) element - self._tail = rows.pop(-1) - # dropping all complete elements to the queue - for row in rows: - self.put_nowait(row) - logger.info('< ' + row) - if len(buf) > 0: - # no EOF, reinstalling event handler - gevent.core.read_event(self._fileno, self._wait_helper) - else: - # EOF found, sending EOF to queue - self.put_nowait(None) - -stdin = FReadlineQueue(sys.stdin, False) + gevent.os.make_nonblocking(fd) + # starting main loop + gevent.spawn(self._frobber) + + def _frobber(self): + while True: + # reading one buffer from stream + buf = gevent.os.nb_read(self._fn, 4096) + # EOF found + if len(buf) == 0: + break + # splitting stream by line ends + rows = buf.decode('l1').split('\n') + # adding tail to the first element if there is some tail + if len(self._tail) > 0: + rows[0] = self._tail + rows[0] + # popping out last (incomplete) element + self._tail = rows.pop(-1) + # dropping all complete elements to the queue + for row in rows: + self.put_nowait(row) + logger.info('< ' + row) + # sending EOF + self.put_nowait(None) + +stdin = FReadlineQueue(sys.stdin) # wrapper against file handler that makes possible to queue some writes without stalling class FWritelineQueue(gevent.queue.JoinableQueue): - # storing fileno, io interface, leftover - __slots__ = frozenset(['_fileno', '_io', '_tail']) + # storing fileno, leftover + __slots__ = frozenset(['_fn', '_tail']) - def __init__(self, fd, closefd = True): - import io + def __init__(self, fd): # initialising class gevent.queue.JoinableQueue.__init__(self) # storing fileno - self._fileno = fd.fileno() - # creating interface - self._io = io.FileIO(self._fileno, 'w', closefd) + self._fn = fd.fileno() + # putting file to nonblocking mode + gevent.os.make_nonblocking(fd) # using empty tail self._tail = None - # putting file to nonblocking mode - fcntl.fcntl(self._fileno, fcntl.F_SETFL, fcntl.fcntl(self._fileno, fcntl.F_GETFL) | os.O_NONBLOCK) def __del__(self): # purge queue before deleting if not self.empty(): self.join() def put(self, item, block=True, timeout=None): # calling real put gevent.queue.JoinableQueue.put(self, item, block, timeout) - # installing event handler - gevent.core.write_event(self._fileno, self._wait_helper) + # starting main loop + gevent.spawn(self._frobber) - def _wait_helper(self, ev, evtype): - # XXX ev, evtype checking? + def _frobber(self): # checking leftover while True: if self._tail == None: try: self._tail = str(self.get_nowait()).encode('utf-8') + '\n' except gevent.queue.Empty: self._tail = None return # writing tail - written = self._io.write(self._tail) + written = gevent.os.nb_write(self._fn, self._tail) length = len(self._tail) if written == length: self._tail = None elif written < length: self._tail = self._tail[written:] - break - else: - break - # reinstalling event handler - gevent.core.write_event(self._fileno, self._wait_helper) # wrapper around database class tagDB(object): __slots__ = frozenset(['_cursor', '_db']) @@ -304,11 +288,11 @@ self._db = tagDB() self._log = logger self._log.info('started') self._request = re.compile('^([0-9]+)\ ((http|ftp):\/\/)?([-\w.]+)(:[0-9]+)?(\/([^ ]*))?\ ([0-9.:]+)\/(-|[\w\.]+)\ (-|\w+)\ (-|GET|HEAD|POST|CONNECT).*$') self._queue = queue - self._stdout = FWritelineQueue(sys.stdout, False) + self._stdout = FWritelineQueue(sys.stdout) def process(self, id, site, ip_address, url_path, line = None): #self._log.info('trying {}'.format(site)) result = self._db.check(site, ip_address) reply = None