Index: squid-tagger.py ================================================================== --- squid-tagger.py +++ squid-tagger.py @@ -163,10 +163,61 @@ # EOF found, sending EOF to queue self.put_nowait(None) stdin = FReadlineQueue(sys.stdin) +class FWritelineQueue(gevent.queue.JoinableQueue): + # storing fileno, io interface, leftover + __slots__ = frozenset(['_fileno', '_io', '_tail']) + + def __init__(self, fd, closefd = True): + import io + # initialising class + gevent.queue.JoinableQueue.__init__(self) + # storing fileno + self._fileno = fd.fileno() + # creating interface + self._io = io.FileIO(self._fileno, 'w', closefd) + # using empty tail + self._tail = None + # putting file to nonblocking mode + fcntl.fcntl(self._fileno, fcntl.F_SETFL, fcntl.fcntl(self._fileno, fcntl.F_GETFL) | os.O_NONBLOCK) + + def __del__(self): + # purge queue before deleting + if not self.empty(): + self.join() + + def put(self, item, block=True, timeout=None): + # calling real put + gevent.queue.JoinableQueue.put(self, item, block, timeout) + # installing event handler + gevent.core.write_event(self._fileno, self._wait_helper) + + def _wait_helper(self, ev, evtype): + # XXX ev, evtype checking? + # checking leftover + while True: + if self._tail == None: + try: + self._tail = str(self.get_nowait()).encode('utf-8') + '\n' + except gevent.queue.Empty: + self._tail = None + return + # writing tail + written = self._io.write(self._tail) + length = len(self._tail) + if written == length: + self._tail = None + elif written < length: + self._tail = self._tail[written:] + break + else: + break + # reinstalling event handler + gevent.core.write_event(self._fileno, self._wait_helper) + # wrapper around database class tagDB(object): __slots__ = frozenset(['_cursor', '_db']) def __init__(self): @@ -225,28 +276,29 @@ self._cursor.execute("select netmask, redirect_url, from_weekday, to_weekday, from_time, to_time, tag::text from rules") return(self._field_names(), self._cursor.fetchall()) # abstract class with basic checking functionality class Checker(object): - __slots__ = frozenset(['_db', '_log', '_queue', '_request']) + __slots__ = frozenset(['_db', '_log', '_queue', '_request', '_stdout']) def __init__(self, queue, logger): self._db = tagDB() self._log = logger - self._log.info('started\n') + self._log.info('started') self._request = re.compile('^([0-9]+)\ (http|ftp):\/\/([-\w.:]+)\/([^ ]*)\ ([0-9.]+)\/(-|[\w\.]+)\ (-|\w+)\ (-|GET|HEAD|POST).*$') self._queue = queue + self._stdout = FWritelineQueue(sys.stdout, False) def process(self, id, site, ip_address, url_path, line = None): - #self._log.info('trying {}\n'.format(site)) + #self._log.info('trying {}'.format(site)) result = self._db.check(site, ip_address) reply = None #self._log.info('got {} lines from database'.format(len(result))) for row in result: if row != None and row[0] != None: if row[1] != None: - self._log.info('trying regexp "{}" versus "{}"\n'.format(row[1], url_path)) + self._log.info('trying regexp "{}" versus "{}"'.format(row[1], url_path)) try: if re.compile(row[1]).match(url_path): reply = row[0].format(url_path) else: continue @@ -253,13 +305,13 @@ except: self._log.info("can't compile regexp") else: reply = row[0].format(url_path) if reply != None: - self.writeline('{} {}\n'.format(id, reply)) + self.writeline('{} {}'.format(id, reply)) return(True) - self.writeline('{}\n'.format(id)) + self.writeline('{}'.format(id)) def check(self): while True: line = self._queue.get() if line == None: @@ -273,16 +325,15 @@ url_path = request.group(4) ip_address = request.group(5) self.process(id, site, ip_address, url_path, line) else: self._log.info('bad request\n') - self.writeline(line + '\n') + self.writeline(line) def writeline(self, string): self._log.info('sending: ' + string) - sys.stdout.write(string) - sys.stdout.flush() + self._stdout.put(string) def loop(self): pool = gevent.pool.Pool() pool.spawn(self.check) pool.join()