#!/usr/bin/env python
from __future__ import division, print_function, unicode_literals
import gevent.monkey
gevent.monkey.patch_all()
import fcntl, gevent.core, gevent.pool, gevent.queue, gevent.socket, os, psycopg2, re, sys
# //inclusion start
# Copyright (C) 2010 Daniele Varrazzo <daniele.varrazzo@gmail.com>
# and licensed under the MIT license:
def gevent_wait_callback(conn, timeout=None):
"""A wait callback useful to allow gevent to work with Psycopg."""
while 1:
state = conn.poll()
if state == psycopg2.extensions.POLL_OK:
break
elif state == psycopg2.extensions.POLL_READ:
gevent.socket.wait_read(conn.fileno(), timeout=timeout)
elif state == psycopg2.extensions.POLL_WRITE:
gevent.socket.wait_write(conn.fileno(), timeout=timeout)
else:
raise psycopg2.OperationalError("Bad result from poll: %r" % state)
if not hasattr(psycopg2.extensions, 'set_wait_callback'):
raise ImportError("support for coroutines not available in this Psycopg version (%s)" % psycopg2.__version__)
psycopg2.extensions.set_wait_callback(gevent_wait_callback)
# //inclusion end
# this classes processes config file and substitutes default values
class Config:
__slots__ = frozenset(['_config', '_default', '_section', 'options'])
_default = {
'log': {
'silent': 'no',
},
'database': {
'host': 'localhost',
'database': 'squidTag',
},}
# function to read in config file
def __init__(self):
import ConfigParser, optparse, os
parser = optparse.OptionParser()
parser.add_option('-c', '--config', dest = 'config',
help = 'config file location', metavar = 'FILE',
default = '/usr/local/etc/squid-tagger.conf')
parser.add_option('-d', '--dump', dest = 'dump',
help = 'dump database', action = 'store_true', metavar = 'bool',
default = False)
parser.add_option('-f', '--flush-database', dest = 'flush_db',
help = 'flush previous database on load', default = False,
action = 'store_true', metavar = 'bool')
parser.add_option('-l', '--load', dest = 'load',
help = 'load database', action = 'store_true', metavar = 'bool',
default = False)
parser.add_option('-D', '--dump-conf', dest = 'dump_conf',
help = 'dump filtering rules', default = False, metavar = 'bool',
action = 'store_true')
parser.add_option('-L', '--load-conf', dest = 'load_conf',
help = 'load filtering rules', default = False, metavar = 'bool',
action = 'store_true')
(self.options, args) = parser.parse_args()
assert os.access(self.options.config, os.R_OK), "Fatal error: can't read {}".format(self.options.config)
self._config = ConfigParser.ConfigParser()
self._config.readfp(open(self.options.config))
# function to select config file section or create one
def section(self, section):
if not self._config.has_section(section):
self._config.add_section(section)
self._section = section
# function to get config parameter, if parameter doesn't exists the default
# value or None is substituted
def __getitem__(self, name):
if not self._config.has_option(self._section, name):
if self._section in self._default:
if name in self._default[self._section]:
self._config.set(self._section, name, self._default[self._section][name])
else:
self._config.set(self._section, name, None)
else:
self._config.set(self._section, name, None)
return(self._config.get(self._section, name))
# initializing and reading in config file
config = Config()
# wrapper around syslog, can be muted
class Logger(object):
__slots__ = frozenset(['_syslog'])
def __init__(self):
config.section('log')
if config['silent'] == 'yes':
self._syslog = None
else:
import syslog
self._syslog = syslog
self._syslog.openlog(str('squidTag'))
def info(self, message):
if self._syslog != None:
self._syslog.syslog(self._syslog.LOG_INFO, message)
def notice(self, message):
if self._syslog != None:
self._syslog.syslog(self._syslog.LOG_NOTICE, message)
logger = Logger()
# tiny wrapper around a file to make reads from it geventable
# or should i move this somewhere?
class FReadlineQueue(gevent.queue.Queue):
# storing file descriptor, leftover
__slots__ = frozenset(['_fd', '_tail'])
def __init__(self, fd):
# initialising class
gevent.queue.Queue.__init__(self)
# storing file descriptor
self._fd = fd
# using empty tail
self._tail = ''
# setting up event
self._install_wait()
def _install_wait(self):
fileno = self._fd.fileno()
# putting file to nonblocking mode
fcntl.fcntl(fileno, fcntl.F_SETFL, fcntl.fcntl(fileno, fcntl.F_GETFL) | os.O_NONBLOCK)
# installing event handler
gevent.core.read_event(fileno, self._wait_helper)
def _wait_helper(self, ev, evtype):
# reading one buffer from stream
buf = self._fd.read(4096)
# splitting stream by line ends
rows = buf.decode('l1').split('\n')
# adding tail to the first element if there is some tail
if len(self._tail) > 0:
rows[0] = self._tail + rows[0]
# popping out last (incomplete) element
self._tail = rows.pop(-1)
# dropping all complete elements to the queue
for row in rows:
self.put_nowait(row)
logger.info('request: ' + row)
if len(buf) > 0:
# no EOF, reinstalling event handler
gevent.core.read_event(self._fd.fileno(), self._wait_helper)
else:
# EOF found, sending EOF to queue
self.put_nowait(None)
stdin = FReadlineQueue(sys.stdin)
# wrapper around database
class tagDB(object):
__slots__ = frozenset(['_cursor', '_db'])
def __init__(self):
config.section('database')
self._db = psycopg2.connect(
database = config['database'],
host = config['host'],
user = config['user'],
password = config['password'],
)
self._cursor = self._db.cursor()
def _field_names(self):
names = []
for record in self._cursor.description:
names.append(record.name)
return(names)
def check(self, site, ip_address):
self._cursor.execute("select * from (select redirect_url, regexp from site_rule where site <@ tripdomain(%s) and netmask >>= %s order by array_length(site, 1) desc) a group by redirect_url, regexp", [site, ip_address])
return(self._cursor.fetchall())
def dump(self):
self._cursor.execute("select untrip(site) as site, tag::text, regexp from urls order by site, tag")
return(self._field_names(), self._cursor.fetchall())
def load(self, data):
if config.options.flush_db:
self._cursor.execute('delete from urls;')
bundle = []
for row in data:
if len(row) == 2:
bundle.append([row[0], row[1], None])
else:
bundle.append([row[0], row[1], row[2]])
self._cursor.executemany("insert into urls (site, tag, regexp) values (tripdomain(%s), %s, %s)", bundle)
self._cursor.execute("update urls set regexp = NULL where regexp = ''")
self._db.commit()
def load_conf(self, csv_data):
self._cursor.execute('delete from rules;')
bundle = []
for row in csv_data:
bundle.append([row[0], row[1], int(row[2]), int(row[3]), row[4], row[5], row[6]])
self._cursor.executemany("insert into rules (netmask, redirect_url, from_weekday, to_weekday, from_time, to_time, tag) values (%s::text::cidr, %s, %s, %s, %s::text::time, %s::text::time, %s::text::text[])", bundle)
self._db.commit()
def dump_conf(self):
self._cursor.execute("select netmask, redirect_url, from_weekday, to_weekday, from_time, to_time, tag::text from rules")
return(self._field_names(), self._cursor.fetchall())
# abstract class with basic checking functionality
class Checker(object):
__slots__ = frozenset(['_db', '_log', '_queue', '_request'])
def __init__(self, queue, logger):
self._db = tagDB()
self._log = logger
self._log.info('started\n')
self._request = re.compile('^([0-9]+)\ (http|ftp):\/\/([-\w.:]+)\/([^ ]*)\ ([0-9.]+)\/(-|[\w\.]+)\ (-|\w+)\ (-|GET|HEAD|POST).*$')
self._queue = queue
def process(self, id, site, ip_address, url_path, line = None):
#self._log.info('trying {}\n'.format(site))
result = self._db.check(site, ip_address)
reply = None
#self._log.info('got {} lines from database'.format(len(result)))
for row in result:
if row != None and row[0] != None:
if row[1] != None:
self._log.info('trying regexp "{}" versus "{}"\n'.format(row[1], url_path))
try:
if re.compile(row[1]).match(url_path):
reply = row[0].format(url_path)
else:
continue
except:
self._log.info("can't compile regexp")
else:
reply = row[0].format(url_path)
if reply != None:
self.writeline('{} {}\n'.format(id, reply))
return(True)
self.writeline('{}\n'.format(id))
def check(self):
while True:
line = self._queue.get()
if line == None:
break
#self._log.info('request: ' + line)
request = self._request.match(line)
if request:
id = request.group(1)
#proto = request.group(2)
site = request.group(3)
url_path = request.group(4)
ip_address = request.group(5)
self.process(id, site, ip_address, url_path, line)
else:
self._log.info('bad request\n')
self.writeline(line + '\n')
def writeline(self, string):
self._log.info('sending: ' + string)
sys.stdout.write(string)
sys.stdout.flush()
def loop(self):
pool = gevent.pool.Pool()
pool.spawn(self.check)
pool.join()
if config.options.dump or config.options.load or config.options.dump_conf or config.options.load_conf:
import csv
tagdb = tagDB()
data_fields = ['site', 'tag', 'regexp']
conf_fields = ['netmask', 'redirect_url', 'from_weekday', 'to_weekday', 'from_time', 'to_time', 'tag']
if config.options.dump or config.options.dump_conf:
csv_writer = csv.writer(sys.stdout)
if config.options.dump:
dump = tagdb.dump()
elif config.options.dump_conf:
dump = tagdb.dump_conf()
csv_writer.writerow(dump[0])
for line in dump[1]:
csv_writer.writerow(line)
elif config.options.load or config.options.load_conf:
csv_reader = csv.reader(sys.stdin)
first_row = next(csv_reader)
if config.options.load:
fields = data_fields
load = tagdb.load
elif config.options.load_conf:
fields = conf_fields
load = tagdb.load_conf
assert first_row == fields, 'File must contain csv data with theese columns: ' + repr(fields)
load(csv_reader)
else:
# main loop
Checker(stdin, logger).loop()