Index: squid-tagger.py ================================================================== --- squid-tagger.py +++ squid-tagger.py @@ -1,9 +1,10 @@ #!/usr/bin/env python3.1 import configparser, optparse, os, postgresql.api, re, sys, _thread +# wrapper around syslog, can be muted class Logger: __slots__ = frozenset(['_syslog']) def __init__(self): config.section('log') @@ -20,10 +21,11 @@ def notice(self, message): if self._syslog: self._syslog.syslog(self._syslog.LOG_NOTICE, message) +# wrapper around database class tagDB: __slots__ = frozenset(['_prepared', '_check_stmt', '_db']) def __init__(self): self._prepared = set() @@ -40,19 +42,69 @@ config['host'], config['database'], ) ) return(self._db) - def check(self, ip_address, site): - return self._check_stmt(site, ip_address) - -class CheckerThread: - __slots__ = frozenset(['_db', '_lock', '_lock_queue', '_log', '_queue']) - - def __init__(self, db, log): - self._db = db - self._log = log + def check(self, site, ip_address): + return(self._check_stmt(site, ip_address)) + +# abstract class with basic checking functionality +class Checker: + __slots__ = frozenset(['_db', '_log', '_queue']) + + def __init__(self): + self._db = tagDB() + self._log = Logger() + + def process(self, id, site, ip_address, url_path): + self._log.info('trying {}\n'.format(site)) + result = self._db.check(site, ip_address) + #reply = '{}://{}/{}'.format(req[4], req[1], req[3]) + reply = '-' + for row in result: + if row != None and row[0] != None: + if row[1] != None: + self._log.info('trying regexp "{}" versus "{}"\n'.format(row[1], url_path)) + if re.compile(row[1]).match(url_path): + reply = '302:' + row[0] + break + else: + continue + else: + reply = '302:' + row[0] + break + self.writeline('{} {}\n'.format(id, reply)) + + def check(self, line): + request = re.compile('^([0-9]+)\ (http|ftp):\/\/([-\w.:]+)\/([^ ]*)\ ([0-9.]+)\/(-|[\w\.]+)\ (-|\w+)\ (-|GET|HEAD|POST).*$').match(line) + if request: + id = request.group(1) + #proto = request.group(2) + site = request.group(3) + url_path = request.group(4) + ip_address = request.group(5) + self.insert(id, site, ip_address, url_path) + + self._log.info('request {} queued ({})\n'.format(id, line)) + else: + self._log.info('bad request\n') + self.writeline(line) + + def insert(self, id, site, ip_address, url_path): + self._queue.append((id, site, ip_address, url_path)) + + def writeline(self, string): + self._log.info('sending: ' + string) + sys.stdout.write(string) + sys.stdout.flush() + +# threaded checking facility +class CheckerThread(Checker): + __slots__ = frozenset(['_lock', '_lock_queue']) + + def __init__(self): + Checker.__init__(self) # Spin lock. Loop acquires it on start then releases it when holding queue # lock. This way the thread proceeds without stops while queue has data and # gets stalled when no data present. The lock is released by queue writer # after storing something into the queue self._lock = _thread.allocate_lock() @@ -68,55 +120,26 @@ # yes this should be written this way, and yes, this is why I hate threading if len(self._queue) > 1 and self._lock.locked(): self._lock.release() req = self._queue.pop(0) self._lock_queue.release() - self._log.info('trying {}\n'.format(req[1])) - result = self._db.check(req[2], req[1]) - #reply = '{}://{}/{}'.format(req[4], req[1], req[3]) - reply = '-' - for row in result: - if row != None and row[0] != None: - if row[1] != None: - self._log.info('trying regexp "{}" versus "{}"\n'.format(row[1], req[3])) - if re.compile(row[1]).match(req[3]): - reply = '302:' + row[0] - break - else: - continue - else: - reply = '302:' + row[0] - break - writeline('{} {}\n'.format(req[0], reply)) - - def check(self, line): - request = re.compile('^([0-9]+)\ (http|ftp):\/\/([-\w.:]+)\/([^ ]*)\ ([0-9.]+)\/(-|[\w\.]+)\ (-|\w+)\ (-|GET|HEAD|POST).*$').match(line) - if request: - id = request.group(1) - #proto = request.group(2) - site = request.group(3) - url_path = request.group(4) - ip_address = request.group(5) - self._lock_queue.acquire() - self._queue.append((id, site, ip_address, url_path)) - if self._lock.locked(): - self._lock.release() - self._lock_queue.release() - self._log.info('request {} queued ({})\n'.format(id, line)) - else: - self._log.info('bad request\n') - writeline(line) - -def writeline(string): - log.info('sending: ' + string) - sys.stdout.write(string) - sys.stdout.flush() + self.process(req[0], req[1], req[2], req[3]) + + def insert(self, id, site, ip_address, url_path): + self._lock_queue.acquire() + Checker.insert(self, id, site, ip_address, url_path) + if self._lock.locked(): + self._lock.release() + self._lock_queue.release() # this classes processes config file and substitutes default values class Config: - __slots__ = frozenset(['_config', '_defaults', '_section']) - _defaults = { + __slots__ = frozenset(['_config', '_default', '_section']) + _default = { + 'reactor': { + 'reactor': 'thread', + }, 'log': { 'silent': 'no', }, 'database': { 'host': 'localhost', @@ -147,26 +170,26 @@ # function to get config parameter, if parameter doesn't exists the default # value or None is substituted def __getitem__(self, name): if not self._config.has_option(self._section, name): - if self._default.has_key(self._section): - if self._default[self._section].has_key(name): + if self._section in self._default: + if name in self._default[self._section]: self._config.set(self._section, name, self._default[self._section][name]) else: self._config.set(self._section, name, None) else: self._config.set(self._section, name, None) - return self._config.get(self._section, name) + return(self._config.get(self._section, name)) # initializing and reading in config file config = Config() -log = Logger() -db = tagDB() -checker = CheckerThread(db,log) +config.section('reactor') +if config['reactor'] == 'thread': + checker = CheckerThread() while True: line = sys.stdin.readline() if len(line) == 0: break checker.check(line)