Index: squid-tagger.py ================================================================== --- squid-tagger.py +++ squid-tagger.py @@ -28,191 +28,10 @@ raise ImportError("support for coroutines not available in this Psycopg version (%s)" % psycopg2.__version__) psycopg2.extensions.set_wait_callback(gevent_wait_callback) # //inclusion end -# tiny wrapper around a file to make reads from it geventable -# or should i move this somewhere? - -class FReadlineQueue(gevent.queue.Queue): - # storing file descriptor, leftover - __slots__ = frozenset(['_fd', '_tail']) - - def __init__(self, fd): - # initialising class - gevent.queue.Queue.__init__(self) - # storing file descriptor - self._fd = fd - # using empty tail - self._tail = '' - # setting up event - self._install_wait() - - def _install_wait(self): - fileno = self._fd.fileno() - # putting file to nonblocking mode - fcntl.fcntl(fileno, fcntl.F_SETFL, fcntl.fcntl(fileno, fcntl.F_GETFL) | os.O_NONBLOCK) - # installing event handler - gevent.core.read_event(fileno, self._wait_helper) - - def _wait_helper(self, ev, evtype): - # reading one buffer from stream - buf = self._fd.read(4096) - # splitting stream by line ends - rows = buf.split('\n') - # adding tail to the first element if there is some tail - if len(self._tail) > 0: - rows[0] = self._tail + rows[0] - # popping out last (incomplete) element - self._tail = rows.pop(-1) - # dropping all complete elements to the queue - for row in rows: - self.put_nowait(row) - if len(buf) > 0: - # no EOF, reinstalling event handler - gevent.core.read_event(self._fd.fileno(), self._wait_helper) - else: - # EOF found, sending EOF to queue - self.put_nowait(None) - -stdin = FReadlineQueue(sys.stdin) - -# wrapper around syslog, can be muted -class Logger(object): - __slots__ = frozenset(['_syslog']) - - def __init__(self): - config.section('log') - if config['silent'] == 'yes': - self._syslog = None - else: - import syslog - self._syslog = syslog - self._syslog.openlog(str('squidTag')) - - def info(self, message): - if self._syslog != None: - self._syslog.syslog(self._syslog.LOG_INFO, message) - - def notice(self, message): - if self._syslog != None: - self._syslog.syslog(self._syslog.LOG_NOTICE, message) - -# wrapper around database -class tagDB(object): - __slots__ = frozenset(['_cursor', '_db']) - - def __init__(self): - config.section('database') - self._db = psycopg2.connect( - database = config['database'], - host = config['host'], - user = config['user'], - password = config['password'], - ) - self._cursor = self._db.cursor() - - def _field_names(self): - names = [] - for record in self._cursor.description: - names.append(record.name) - return(names) - - def check(self, site, ip_address): - self._cursor.execute("select redirect_url, regexp from site_rule where site <@ tripdomain(%s) and netmask >>= %s order by array_length(site, 1) desc", [site, ip_address]) - return(self._cursor.fetchall()) - - def dump(self): - self._cursor.execute("select untrip(site) as site, tag::text, regexp from urls order by site, tag") - return(self._field_names(), self._cursor.fetchall()) - - def load(self, data): - if config.options.flush_db: - self._cursor.execute('delete from urls;') - bundle = [] - for row in data: - if len(row) == 2: - bundle.append([row[0], row[1], None]) - else: - bundle.append([row[0], row[1], row[2]]) - self._cursor.executemany("insert into urls (site, tag, regexp) values (tripdomain(%s), %s, %s)", bundle) - self._cursor.execute("update urls set regexp = NULL where regexp = ''") - self._db.commit() - - def load_conf(self, csv_data): - self._cursor.execute('delete from rules;') - bundle = [] - for row in csv_data: - bundle.append([row[0], row[1], int(row[2]), int(row[3]), row[4], row[5], row[6]]) - self._cursor.executemany("insert into rules (netmask, redirect_url, from_weekday, to_weekday, from_time, to_time, tag) values (%s::text::cidr, %s, %s, %s, %s::text::time, %s::text::time, %s::text::text[])", bundle) - self._db.commit() - - def dump_conf(self): - self._cursor.execute("select netmask, redirect_url, from_weekday, to_weekday, from_time, to_time, tag::text from rules") - return(self._field_names(), self._cursor.fetchall()) - -# abstract class with basic checking functionality -class Checker(object): - __slots__ = frozenset(['_db', '_log', '_queue', '_request']) - - def __init__(self, queue): - self._db = tagDB() - self._log = Logger() - self._log.info('started\n') - self._request = re.compile('^([0-9]+)\ (http|ftp):\/\/([-\w.:]+)\/([^ ]*)\ ([0-9.]+)\/(-|[\w\.]+)\ (-|\w+)\ (-|GET|HEAD|POST).*$') - self._queue = queue - - def process(self, id, site, ip_address, url_path, line = None): - #self._log.info('trying {}\n'.format(site)) - result = self._db.check(site, ip_address) - reply = None - for row in result: - if row != None and row[0] != None: - if row[1] != None: - self._log.info('trying regexp "{}" versus "{}"\n'.format(row[1], url_path)) - try: - if re.compile(row[1]).match(url_path): - reply = row[0].format(url_path) - else: - continue - except: - self._log.info("can't compile regexp") - else: - reply = row[0].format(url_path) - if reply != None: - self.writeline('{} {}\n'.format(id, reply)) - return(True) - self.writeline('{}\n'.format(id)) - - def check(self): - while True: - line = self._queue.get() - if line == None: - break - self._log.info('request: ' + line) - request = self._request.match(line) - if request: - id = request.group(1) - #proto = request.group(2) - site = request.group(3) - url_path = request.group(4) - ip_address = request.group(5) - self.process(id, site, ip_address, url_path, line) - else: - self._log.info('bad request\n') - self.writeline(line + '\n') - - def writeline(self, string): - self._log.info('sending: ' + string) - sys.stdout.write(string) - sys.stdout.flush() - - def loop(self): - pool = gevent.pool.Pool() - pool.spawn(self.check) - pool.join() - # this classes processes config file and substitutes default values class Config: __slots__ = frozenset(['_config', '_default', '_section', 'options']) _default = { 'log': { @@ -273,10 +92,195 @@ self._config.set(self._section, name, None) return(self._config.get(self._section, name)) # initializing and reading in config file config = Config() + +# wrapper around syslog, can be muted +class Logger(object): + __slots__ = frozenset(['_syslog']) + + def __init__(self): + config.section('log') + if config['silent'] == 'yes': + self._syslog = None + else: + import syslog + self._syslog = syslog + self._syslog.openlog(str('squidTag')) + + def info(self, message): + if self._syslog != None: + self._syslog.syslog(self._syslog.LOG_INFO, message) + + def notice(self, message): + if self._syslog != None: + self._syslog.syslog(self._syslog.LOG_NOTICE, message) + +logger = Logger() + +# tiny wrapper around a file to make reads from it geventable +# or should i move this somewhere? + +class FReadlineQueue(gevent.queue.Queue): + # storing file descriptor, leftover + __slots__ = frozenset(['_fd', '_tail']) + + def __init__(self, fd): + # initialising class + gevent.queue.Queue.__init__(self) + # storing file descriptor + self._fd = fd + # using empty tail + self._tail = '' + # setting up event + self._install_wait() + + def _install_wait(self): + fileno = self._fd.fileno() + # putting file to nonblocking mode + fcntl.fcntl(fileno, fcntl.F_SETFL, fcntl.fcntl(fileno, fcntl.F_GETFL) | os.O_NONBLOCK) + # installing event handler + gevent.core.read_event(fileno, self._wait_helper) + + def _wait_helper(self, ev, evtype): + # reading one buffer from stream + buf = self._fd.read(4096) + # splitting stream by line ends + rows = buf.decode('l1').split('\n') + # adding tail to the first element if there is some tail + if len(self._tail) > 0: + rows[0] = self._tail + rows[0] + # popping out last (incomplete) element + self._tail = rows.pop(-1) + # dropping all complete elements to the queue + for row in rows: + self.put_nowait(row) + logger.info('request: ' + row) + if len(buf) > 0: + # no EOF, reinstalling event handler + gevent.core.read_event(self._fd.fileno(), self._wait_helper) + else: + # EOF found, sending EOF to queue + self.put_nowait(None) + +stdin = FReadlineQueue(sys.stdin) + +# wrapper around database +class tagDB(object): + __slots__ = frozenset(['_cursor', '_db']) + + def __init__(self): + config.section('database') + self._db = psycopg2.connect( + database = config['database'], + host = config['host'], + user = config['user'], + password = config['password'], + ) + self._cursor = self._db.cursor() + + def _field_names(self): + names = [] + for record in self._cursor.description: + names.append(record.name) + return(names) + + def check(self, site, ip_address): + self._cursor.execute("select * from (select redirect_url, regexp from site_rule where site <@ tripdomain(%s) and netmask >>= %s order by array_length(site, 1) desc) a group by redirect_url, regexp", [site, ip_address]) + return(self._cursor.fetchall()) + + def dump(self): + self._cursor.execute("select untrip(site) as site, tag::text, regexp from urls order by site, tag") + return(self._field_names(), self._cursor.fetchall()) + + def load(self, data): + if config.options.flush_db: + self._cursor.execute('delete from urls;') + bundle = [] + for row in data: + if len(row) == 2: + bundle.append([row[0], row[1], None]) + else: + bundle.append([row[0], row[1], row[2]]) + self._cursor.executemany("insert into urls (site, tag, regexp) values (tripdomain(%s), %s, %s)", bundle) + self._cursor.execute("update urls set regexp = NULL where regexp = ''") + self._db.commit() + + def load_conf(self, csv_data): + self._cursor.execute('delete from rules;') + bundle = [] + for row in csv_data: + bundle.append([row[0], row[1], int(row[2]), int(row[3]), row[4], row[5], row[6]]) + self._cursor.executemany("insert into rules (netmask, redirect_url, from_weekday, to_weekday, from_time, to_time, tag) values (%s::text::cidr, %s, %s, %s, %s::text::time, %s::text::time, %s::text::text[])", bundle) + self._db.commit() + + def dump_conf(self): + self._cursor.execute("select netmask, redirect_url, from_weekday, to_weekday, from_time, to_time, tag::text from rules") + return(self._field_names(), self._cursor.fetchall()) + +# abstract class with basic checking functionality +class Checker(object): + __slots__ = frozenset(['_db', '_log', '_queue', '_request']) + + def __init__(self, queue, logger): + self._db = tagDB() + self._log = logger + self._log.info('started\n') + self._request = re.compile('^([0-9]+)\ (http|ftp):\/\/([-\w.:]+)\/([^ ]*)\ ([0-9.]+)\/(-|[\w\.]+)\ (-|\w+)\ (-|GET|HEAD|POST).*$') + self._queue = queue + + def process(self, id, site, ip_address, url_path, line = None): + #self._log.info('trying {}\n'.format(site)) + result = self._db.check(site, ip_address) + reply = None + #self._log.info('got {} lines from database'.format(len(result))) + for row in result: + if row != None and row[0] != None: + if row[1] != None: + self._log.info('trying regexp "{}" versus "{}"\n'.format(row[1], url_path)) + try: + if re.compile(row[1]).match(url_path): + reply = row[0].format(url_path) + else: + continue + except: + self._log.info("can't compile regexp") + else: + reply = row[0].format(url_path) + if reply != None: + self.writeline('{} {}\n'.format(id, reply)) + return(True) + self.writeline('{}\n'.format(id)) + + def check(self): + while True: + line = self._queue.get() + if line == None: + break + #self._log.info('request: ' + line) + request = self._request.match(line) + if request: + id = request.group(1) + #proto = request.group(2) + site = request.group(3) + url_path = request.group(4) + ip_address = request.group(5) + self.process(id, site, ip_address, url_path, line) + else: + self._log.info('bad request\n') + self.writeline(line + '\n') + + def writeline(self, string): + self._log.info('sending: ' + string) + sys.stdout.write(string) + sys.stdout.flush() + + def loop(self): + pool = gevent.pool.Pool() + pool.spawn(self.check) + pool.join() if config.options.dump or config.options.load or config.options.dump_conf or config.options.load_conf: import csv tagdb = tagDB() @@ -308,6 +312,6 @@ assert first_row == fields, 'File must contain csv data with theese columns: ' + repr(fields) load(csv_reader) else: # main loop - Checker(stdin).loop() + Checker(stdin, logger).loop()