Index: squid-tagger.py ================================================================== --- squid-tagger.py +++ squid-tagger.py @@ -94,13 +94,49 @@ # initializing and reading in config file config = Config() import logging, logging.handlers + +# wrapper around logging handler to make it queue records and don't stall when sending them + +class SysLogHandlerQueue(logging.handlers.SysLogHandler): + __slots__ = frozenset(['_event', '_tail', '_workers']) + + def __init__(self): + logging.handlers.SysLogHandler.__init__(self, '/dev/log') + self._event = gevent.event.Event() + self._event.set() + self._tail = gevent.queue.Queue() + self._workers = set() + + def emit(self, record): + # my syslog is broken and cannot into UTF-8 BOM + record.msg = str(record.msg) + self._tail.put(record) + if self._tail.qsize() != 0: + # in case queue is empty we will spawn new worker + # all workers are logged so we can kill them on close() + self._workers.add(gevent.spawn(self._writer)) + + def _writer(self): + # here we are locking the queue so we can be sure we are the only one + self._event.wait() + self._event.clear() + while not self._tail.empty(): + logging.handlers.SysLogHandler.emit(self, self._tail.get()) + self._event.set() + self._workers.remove(gevent.getcurrent()) + + def close(self): + for worker in self._workers: + gevent.kill(worker) + logging.handlers.SysLogHandler.close(self) + logger = logging.getLogger('squidTag') logger.setLevel(logging.INFO) -handler = logging.handlers.SysLogHandler('/dev/log') +handler = SysLogHandlerQueue() handler.setFormatter(logging.Formatter(str('squidTag[%(process)s]: %(message)s'))) logger.addHandler(handler) # tiny wrapper around a file to make reads from it geventable # or should i move this somewhere? @@ -137,19 +173,21 @@ # popping out last (incomplete) element self._tail = rows.pop(-1) # dropping all complete elements to the queue for row in rows: self.put_nowait(row) - logger.info(str('< ' + row)) + logger.info('< ' + row) if len(buf) > 0: # no EOF, reinstalling event handler gevent.core.read_event(self._fd.fileno(), self._wait_helper) else: # EOF found, sending EOF to queue self.put_nowait(None) stdin = FReadlineQueue(sys.stdin) + +# wrapper against file handler that makes possible to queue some writes without stalling class FWritelineQueue(gevent.queue.JoinableQueue): # storing fileno, io interface, leftover __slots__ = frozenset(['_fileno', '_io', '_tail']) @@ -265,65 +303,60 @@ __slots__ = frozenset(['_db', '_log', '_queue', '_request', '_stdout']) def __init__(self, queue, logger): self._db = tagDB() self._log = logger - self._log.info(str('started')) + self._log.info('started') self._request = re.compile('^([0-9]+)\ (http|ftp):\/\/([-\w.:]+)\/([^ ]*)\ ([0-9.]+)\/(-|[\w\.]+)\ (-|\w+)\ (-|GET|HEAD|POST).*$') self._queue = queue self._stdout = FWritelineQueue(sys.stdout, False) def process(self, id, site, ip_address, url_path, line = None): - #self._log.info(str('trying {}'.format(site))) + #self._log.info('trying {}'.format(site)) result = self._db.check(site, ip_address) reply = None - #self._log.info(str('got {} lines from database'.format(len(result)))) + #self._log.info('got {} lines from database'.format(len(result))) for row in result: if row != None and row[0] != None: if row[1] != None: - self._log.info(str('trying regexp "{}" versus "{}"'.format(row[1], url_path))) + self._log.info('trying regexp "{}" versus "{}"'.format(row[1], url_path)) try: if re.compile(row[1]).match(url_path): reply = row[0].format(url_path) else: continue except: - self._log.info(str("can't compile regexp")) + self._log.info("can't compile regexp") else: reply = row[0].format(url_path) if reply != None: self.writeline('{} {}'.format(id, reply)) return(True) self.writeline('{}'.format(id)) - def check(self): + def loop(self): while True: line = self._queue.get() if line == None: break - #self._log.info(str('request: ' + line)) + #self._log.info('request: ' + line) request = self._request.match(line) if request: id = request.group(1) #proto = request.group(2) site = request.group(3) url_path = request.group(4) ip_address = request.group(5) self.process(id, site, ip_address, url_path, line) else: - self._log.info(str('bad request')) + self._log.info('bad request') self.writeline(line) def writeline(self, string): - self._log.info(str('> ' + string)) + self._log.info('> ' + string) self._stdout.put(string) - def loop(self): - pool = gevent.pool.Pool() - pool.spawn(self.check) - pool.join() - if config.options.dump or config.options.load or config.options.dump_conf or config.options.load_conf: import csv tagdb = tagDB() data_fields = ['site', 'tag', 'regexp']