Index: squid-tagger.conf ================================================================== --- squid-tagger.conf +++ squid-tagger.conf @@ -18,5 +18,11 @@ # This section can be used to turn off some logging. [log] # There would be no logs generated if 'silent' is set to 'yes'. silent = no + +# This section control reactor wich would service main loop. +[reactor] + +# Possible reactor types: plain, thread and kqueue (broken) +reactor = thread Index: squid-tagger.py ================================================================== --- squid-tagger.py +++ squid-tagger.py @@ -47,17 +47,17 @@ def check(self, site, ip_address): return(self._check_stmt(site, ip_address)) # abstract class with basic checking functionality class Checker: - __slots__ = frozenset(['_db', '_log', '_queue']) + __slots__ = frozenset(['_db', '_log']) def __init__(self): self._db = tagDB() self._log = Logger() - def process(self, id, site, ip_address, url_path): + def process(self, id, site, ip_address, url_path, line = None): self._log.info('trying {}\n'.format(site)) result = self._db.check(site, ip_address) #reply = '{}://{}/{}'.format(req[4], req[1], req[3]) reply = '-' for row in result: @@ -80,58 +80,124 @@ id = request.group(1) #proto = request.group(2) site = request.group(3) url_path = request.group(4) ip_address = request.group(5) - self.insert(id, site, ip_address, url_path) - - self._log.info('request {} queued ({})\n'.format(id, line)) + self.process(id, site, ip_address, url_path, line) else: self._log.info('bad request\n') self.writeline(line) - def insert(self, id, site, ip_address, url_path): - self._queue.append((id, site, ip_address, url_path)) - def writeline(self, string): self._log.info('sending: ' + string) sys.stdout.write(string) sys.stdout.flush() + def loop(self): + while True: + line = sys.stdin.readline() + if len(line) == 0: + break + self.check(line) + # threaded checking facility class CheckerThread(Checker): - __slots__ = frozenset(['_lock', '_lock_queue']) + __slots__ = frozenset(['_lock', '_lock_exit', '_lock_queue', '_queue']) def __init__(self): + # basic initialisation Checker.__init__(self) + # Spin lock. Loop acquires it on start then releases it when holding queue # lock. This way the thread proceeds without stops while queue has data and # gets stalled when no data present. The lock is released by queue writer # after storing something into the queue self._lock = _thread.allocate_lock() + self._lock_exit = _thread.allocate_lock() self._lock_queue = _thread.allocate_lock() self._lock.acquire() self._queue = [] _thread.start_new_thread(self._start, ()) def _start(self): while True: self._lock.acquire() - self._lock_queue.acquire() - # yes this should be written this way, and yes, this is why I hate threading - if len(self._queue) > 1 and self._lock.locked(): - self._lock.release() - req = self._queue.pop(0) - self._lock_queue.release() - self.process(req[0], req[1], req[2], req[3]) - - def insert(self, id, site, ip_address, url_path): - self._lock_queue.acquire() - Checker.insert(self, id, site, ip_address, url_path) - if self._lock.locked(): - self._lock.release() - self._lock_queue.release() + with self._lock_queue: + # yes this should be written this way, and yes, this is why I hate threading + if len(self._queue) > 1: + if self._lock.locked(): + self._lock.release() + req = self._queue.pop(0) + Checker.process(self, req[0], req[1], req[2], req[3]) + with self._lock_queue: + if len(self._queue) == 0: + if self._lock_exit.locked(): + self._lock_exit.release() + + def process(self, id, site, ip_address, url_path, line): + with self._lock_queue: + self._queue.append((id, site, ip_address, url_path)) + self._log.info('request {} queued ({})\n'.format(id, line)) + if not self._lock_exit.locked(): + self._lock_exit.acquire() + if self._lock.locked(): + self._lock.release() + + def loop(self): + while True: + line = sys.stdin.readline() + if len(line) == 0: + break + self.check(line) + self._lock_exit.acquire() + +# kqueue enable class for BSD's XXX broken for now +class CheckerKqueue(Checker): + __slots__ = frozenset(['_kq', '_select', '_queue']) + + def __init__(self): + # basic initialisation + Checker.__init__(self) + + # importing select module + import select + self._select = select + + # kreating kqueue + self._kq = self._select.kqueue() + assert (self._kq.fileno() != -1) + + # watching sys.stdin for data + self._kq.control([self._select.kevent(sys.stdin, self._select.KQ_FILTER_READ, self._select.KQ_EV_ADD)], 0) + + # creating data queue + self._queue = [] + + def loop(self): + # Wait for data by default + timeout = None + while True: + # checking if there is any data + kevs = self._kq.control(None, 1, timeout) + if len(kevs) > 0: + #kev = kevs[0] + # XXX add some code to read only known data size and check for newlines + line = sys.stdin.readline() + # add data to the queue + self.check(line) + # don't wait for data, start processing + timeout = 0 + else: + req = self._queue.pop(0) + Checker.process(self, req[0], req[1], req[2], req[3]) + if len(self._queue) == 0: + # wait for data - we have nothing to process + timeout = None + + def process(self, id, site, ip_address, url_path, line): + self._queue.append((id, site, ip_address, url_path)) + self._log.info('request {} queued ({})\n'.format(id, line)) # this classes processes config file and substitutes default values class Config: __slots__ = frozenset(['_config', '_default', '_section']) _default = { @@ -185,11 +251,11 @@ config = Config() config.section('reactor') if config['reactor'] == 'thread': checker = CheckerThread() +elif config['reactor'] == 'plain': + checker = Checker() +elif config['reactor'] == 'kqueue': + checker = CheckerKqueue() -while True: - line = sys.stdin.readline() - if len(line) == 0: - break - checker.check(line) +checker.loop()