0ef24b1937 2011-04-06 1: #!/usr/bin/env python
0ef24b1937 2011-04-06 2:
0ef24b1937 2011-04-06 3: from __future__ import division, print_function, unicode_literals
0ef24b1937 2011-04-06 4:
0ef24b1937 2011-04-06 5: import gevent.monkey
0ef24b1937 2011-04-06 6: gevent.monkey.patch_all()
0ef24b1937 2011-04-06 7:
0ef24b1937 2011-04-06 8: import fcntl, gevent.core, gevent.pool, gevent.queue, gevent.socket, os, psycopg2, re, sys
0ef24b1937 2011-04-06 9:
0ef24b1937 2011-04-06 10: # //inclusion start
0ef24b1937 2011-04-06 11: # Copyright (C) 2010 Daniele Varrazzo <daniele.varrazzo@gmail.com>
0ef24b1937 2011-04-06 12: # and licensed under the MIT license:
0ef24b1937 2011-04-06 13:
0ef24b1937 2011-04-06 14: def gevent_wait_callback(conn, timeout=None):
0ef24b1937 2011-04-06 15: """A wait callback useful to allow gevent to work with Psycopg."""
0ef24b1937 2011-04-06 16: while 1:
0ef24b1937 2011-04-06 17: state = conn.poll()
0ef24b1937 2011-04-06 18: if state == psycopg2.extensions.POLL_OK:
0ef24b1937 2011-04-06 19: break
0ef24b1937 2011-04-06 20: elif state == psycopg2.extensions.POLL_READ:
0ef24b1937 2011-04-06 21: gevent.socket.wait_read(conn.fileno(), timeout=timeout)
0ef24b1937 2011-04-06 22: elif state == psycopg2.extensions.POLL_WRITE:
0ef24b1937 2011-04-06 23: gevent.socket.wait_write(conn.fileno(), timeout=timeout)
0ef24b1937 2011-04-06 24: else:
0ef24b1937 2011-04-06 25: raise psycopg2.OperationalError("Bad result from poll: %r" % state)
0ef24b1937 2011-04-06 26:
0ef24b1937 2011-04-06 27: if not hasattr(psycopg2.extensions, 'set_wait_callback'):
0ef24b1937 2011-04-06 28: raise ImportError("support for coroutines not available in this Psycopg version (%s)" % psycopg2.__version__)
0ef24b1937 2011-04-06 29: psycopg2.extensions.set_wait_callback(gevent_wait_callback)
0ef24b1937 2011-04-06 30:
0ef24b1937 2011-04-06 31: # //inclusion end
0ef24b1937 2011-04-06 32:
0ef24b1937 2011-04-06 33: # tiny wrapper around a file to make reads from it geventable
0ef24b1937 2011-04-06 34: # or should i move this somewhere?
0ef24b1937 2011-04-06 35:
0ef24b1937 2011-04-06 36: class FReadlineQueue(gevent.queue.Queue):
0ef24b1937 2011-04-06 37: # storing file descriptor, leftover
0ef24b1937 2011-04-06 38: __slots__ = frozenset(['_fd', '_tail'])
0ef24b1937 2011-04-06 39:
0ef24b1937 2011-04-06 40: def __init__(self, fd):
0ef24b1937 2011-04-06 41: # initialising class
0ef24b1937 2011-04-06 42: gevent.queue.Queue.__init__(self)
0ef24b1937 2011-04-06 43: # storing file descriptor
0ef24b1937 2011-04-06 44: self._fd = fd
0ef24b1937 2011-04-06 45: # using empty tail
0ef24b1937 2011-04-06 46: self._tail = ''
0ef24b1937 2011-04-06 47: # setting up event
0ef24b1937 2011-04-06 48: self._install_wait()
0ef24b1937 2011-04-06 49:
0ef24b1937 2011-04-06 50: def _install_wait(self):
0ef24b1937 2011-04-06 51: fileno = self._fd.fileno()
0ef24b1937 2011-04-06 52: # putting file to nonblocking mode
0ef24b1937 2011-04-06 53: fcntl.fcntl(fileno, fcntl.F_SETFL, fcntl.fcntl(fileno, fcntl.F_GETFL) | os.O_NONBLOCK)
0ef24b1937 2011-04-06 54: # installing event handler
0ef24b1937 2011-04-06 55: gevent.core.read_event(fileno, self._wait_helper)
0ef24b1937 2011-04-06 56:
0ef24b1937 2011-04-06 57: def _wait_helper(self, ev, evtype):
0ef24b1937 2011-04-06 58: # reading one buffer from stream
0ef24b1937 2011-04-06 59: buf = self._fd.read(4096)
0ef24b1937 2011-04-06 60: # splitting stream by line ends
0ef24b1937 2011-04-06 61: rows = buf.split('\n')
0ef24b1937 2011-04-06 62: # adding tail to the first element if there is some tail
0ef24b1937 2011-04-06 63: if len(self._tail) > 0:
0ef24b1937 2011-04-06 64: rows[0] = self._tail + rows[0]
0ef24b1937 2011-04-06 65: # popping out last (incomplete) element
0ef24b1937 2011-04-06 66: self._tail = rows.pop(-1)
0ef24b1937 2011-04-06 67: # dropping all complete elements to the queue
0ef24b1937 2011-04-06 68: for row in rows:
0ef24b1937 2011-04-06 69: self.put_nowait(row)
0ef24b1937 2011-04-06 70: if len(buf) > 0:
0ef24b1937 2011-04-06 71: # no EOF, reinstalling event handler
0ef24b1937 2011-04-06 72: gevent.core.read_event(self._fd.fileno(), self._wait_helper)
0ef24b1937 2011-04-06 73: else:
0ef24b1937 2011-04-06 74: # EOF found, sending EOF to queue
0ef24b1937 2011-04-06 75: self.put_nowait(None)
d500448801 2009-10-05 76:
0ef24b1937 2011-04-06 77: stdin = FReadlineQueue(sys.stdin)
d500448801 2009-10-05 78:
b93dc49210 2009-10-13 79: # wrapper around syslog, can be muted
0ef24b1937 2011-04-06 80: class Logger(object):
d500448801 2009-10-05 81: __slots__ = frozenset(['_syslog'])
d500448801 2009-10-05 82:
d500448801 2009-10-05 83: def __init__(self):
d500448801 2009-10-05 84: config.section('log')
d500448801 2009-10-05 85: if config['silent'] == 'yes':
d500448801 2009-10-05 86: self._syslog = None
d500448801 2009-10-05 87: else:
d500448801 2009-10-05 88: import syslog
d500448801 2009-10-05 89: self._syslog = syslog
0ef24b1937 2011-04-06 90: self._syslog.openlog(str('squidTag'))
d500448801 2009-10-05 91:
d500448801 2009-10-05 92: def info(self, message):
0ef24b1937 2011-04-06 93: if self._syslog != None:
d500448801 2009-10-05 94: self._syslog.syslog(self._syslog.LOG_INFO, message)
d500448801 2009-10-05 95:
d500448801 2009-10-05 96: def notice(self, message):
0ef24b1937 2011-04-06 97: if self._syslog != None:
d500448801 2009-10-05 98: self._syslog.syslog(self._syslog.LOG_NOTICE, message)
d500448801 2009-10-05 99:
b93dc49210 2009-10-13 100: # wrapper around database
0ef24b1937 2011-04-06 101: class tagDB(object):
0ef24b1937 2011-04-06 102: __slots__ = frozenset(['_cursor', '_db'])
d500448801 2009-10-05 103:
d500448801 2009-10-05 104: def __init__(self):
9450c03d41 2010-08-07 105: config.section('database')
0ef24b1937 2011-04-06 106: self._db = psycopg2.connect(
0ef24b1937 2011-04-06 107: database = config['database'],
0ef24b1937 2011-04-06 108: host = config['host'],
0ef24b1937 2011-04-06 109: user = config['user'],
0ef24b1937 2011-04-06 110: password = config['password'],
0ef24b1937 2011-04-06 111: )
0ef24b1937 2011-04-06 112: self._cursor = self._db.cursor()
0ef24b1937 2011-04-06 113:
0ef24b1937 2011-04-06 114: def _field_names(self):
0ef24b1937 2011-04-06 115: names = []
0ef24b1937 2011-04-06 116: for record in self._cursor.description:
0ef24b1937 2011-04-06 117: names.append(record.name)
0ef24b1937 2011-04-06 118: return(names)
b93dc49210 2009-10-13 119:
b93dc49210 2009-10-13 120: def check(self, site, ip_address):
0ef24b1937 2011-04-06 121: self._cursor.execute("select redirect_url, regexp from site_rule where site <@ tripdomain(%s) and netmask >>= %s order by array_length(site, 1) desc", [site, ip_address])
0ef24b1937 2011-04-06 122: return(self._cursor.fetchall())
ae30851739 2010-08-12 123:
ae30851739 2010-08-12 124: def dump(self):
0ef24b1937 2011-04-06 125: self._cursor.execute("select untrip(site) as site, tag::text, regexp from urls order by site, tag")
0ef24b1937 2011-04-06 126: return(self._field_names(), self._cursor.fetchall())
bde51dc0c7 2010-08-26 127:
bde51dc0c7 2010-08-26 128: def load(self, data):
0ef24b1937 2011-04-06 129: if config.options.flush_db:
0ef24b1937 2011-04-06 130: self._cursor.execute('delete from urls;')
0ef24b1937 2011-04-06 131: bundle = []
0ef24b1937 2011-04-06 132: for row in data:
0ef24b1937 2011-04-06 133: if len(row) == 2:
0ef24b1937 2011-04-06 134: bundle.append([row[0], row[1], None])
0ef24b1937 2011-04-06 135: else:
0ef24b1937 2011-04-06 136: bundle.append([row[0], row[1], row[2]])
0ef24b1937 2011-04-06 137: self._cursor.executemany("insert into urls (site, tag, regexp) values (tripdomain(%s), %s, %s)", bundle)
0ef24b1937 2011-04-06 138: self._cursor.execute("update urls set regexp = NULL where regexp = ''")
0ef24b1937 2011-04-06 139: self._db.commit()
d301d9adc6 2010-08-13 140:
d301d9adc6 2010-08-13 141: def load_conf(self, csv_data):
0ef24b1937 2011-04-06 142: self._cursor.execute('delete from rules;')
0ef24b1937 2011-04-06 143: bundle = []
0ef24b1937 2011-04-06 144: for row in csv_data:
0ef24b1937 2011-04-06 145: bundle.append([row[0], row[1], int(row[2]), int(row[3]), row[4], row[5], row[6]])
0ef24b1937 2011-04-06 146: self._cursor.executemany("insert into rules (netmask, redirect_url, from_weekday, to_weekday, from_time, to_time, tag) values (%s::text::cidr, %s, %s, %s, %s::text::time, %s::text::time, %s::text::text[])", bundle)
0ef24b1937 2011-04-06 147: self._db.commit()
d301d9adc6 2010-08-13 148:
d301d9adc6 2010-08-13 149: def dump_conf(self):
0ef24b1937 2011-04-06 150: self._cursor.execute("select netmask, redirect_url, from_weekday, to_weekday, from_time, to_time, tag::text from rules")
0ef24b1937 2011-04-06 151: return(self._field_names(), self._cursor.fetchall())
b93dc49210 2009-10-13 152:
b93dc49210 2009-10-13 153: # abstract class with basic checking functionality
0ef24b1937 2011-04-06 154: class Checker(object):
0ef24b1937 2011-04-06 155: __slots__ = frozenset(['_db', '_log', '_queue', '_request'])
b93dc49210 2009-10-13 156:
0ef24b1937 2011-04-06 157: def __init__(self, queue):
b93dc49210 2009-10-13 158: self._db = tagDB()
b93dc49210 2009-10-13 159: self._log = Logger()
7c13294e9f 2010-08-07 160: self._log.info('started\n')
bde51dc0c7 2010-08-26 161: self._request = re.compile('^([0-9]+)\ (http|ftp):\/\/([-\w.:]+)\/([^ ]*)\ ([0-9.]+)\/(-|[\w\.]+)\ (-|\w+)\ (-|GET|HEAD|POST).*$')
0ef24b1937 2011-04-06 162: self._queue = queue
b93dc49210 2009-10-13 163:
ed7808827d 2009-10-14 164: def process(self, id, site, ip_address, url_path, line = None):
0ef24b1937 2011-04-06 165: #self._log.info('trying {}\n'.format(site))
b93dc49210 2009-10-13 166: result = self._db.check(site, ip_address)
ddbf5288b9 2010-11-03 167: reply = None
b93dc49210 2009-10-13 168: for row in result:
b93dc49210 2009-10-13 169: if row != None and row[0] != None:
b93dc49210 2009-10-13 170: if row[1] != None:
b93dc49210 2009-10-13 171: self._log.info('trying regexp "{}" versus "{}"\n'.format(row[1], url_path))
d2c54d0451 2010-03-01 172: try:
d2c54d0451 2010-03-01 173: if re.compile(row[1]).match(url_path):
1fa8a88371 2010-07-14 174: reply = row[0].format(url_path)
d2c54d0451 2010-03-01 175: else:
d2c54d0451 2010-03-01 176: continue
d2c54d0451 2010-03-01 177: except:
d2c54d0451 2010-03-01 178: self._log.info("can't compile regexp")
d2c54d0451 2010-03-01 179: else:
1fa8a88371 2010-07-14 180: reply = row[0].format(url_path)
ddbf5288b9 2010-11-03 181: if reply != None:
ddbf5288b9 2010-11-03 182: self.writeline('{} {}\n'.format(id, reply))
ddbf5288b9 2010-11-03 183: return(True)
ddbf5288b9 2010-11-03 184: self.writeline('{}\n'.format(id))
ddbf5288b9 2010-11-03 185:
0ef24b1937 2011-04-06 186: def check(self):
0ef24b1937 2011-04-06 187: while True:
0ef24b1937 2011-04-06 188: line = self._queue.get()
0ef24b1937 2011-04-06 189: if line == None:
0ef24b1937 2011-04-06 190: break
0ef24b1937 2011-04-06 191: self._log.info('request: ' + line)
0ef24b1937 2011-04-06 192: request = self._request.match(line)
0ef24b1937 2011-04-06 193: if request:
0ef24b1937 2011-04-06 194: id = request.group(1)
0ef24b1937 2011-04-06 195: #proto = request.group(2)
0ef24b1937 2011-04-06 196: site = request.group(3)
0ef24b1937 2011-04-06 197: url_path = request.group(4)
0ef24b1937 2011-04-06 198: ip_address = request.group(5)
0ef24b1937 2011-04-06 199: self.process(id, site, ip_address, url_path, line)
0ef24b1937 2011-04-06 200: else:
0ef24b1937 2011-04-06 201: self._log.info('bad request\n')
0ef24b1937 2011-04-06 202: self.writeline(line + '\n')
b93dc49210 2009-10-13 203:
b93dc49210 2009-10-13 204: def writeline(self, string):
b93dc49210 2009-10-13 205: self._log.info('sending: ' + string)
b93dc49210 2009-10-13 206: sys.stdout.write(string)
b93dc49210 2009-10-13 207: sys.stdout.flush()
b93dc49210 2009-10-13 208:
ed7808827d 2009-10-14 209: def loop(self):
0ef24b1937 2011-04-06 210: pool = gevent.pool.Pool()
0ef24b1937 2011-04-06 211: pool.spawn(self.check)
0ef24b1937 2011-04-06 212: pool.join()
fc934cead1 2009-10-13 213:
fc934cead1 2009-10-13 214: # this classes processes config file and substitutes default values
88c03b5440 2009-10-09 215: class Config:
ae30851739 2010-08-12 216: __slots__ = frozenset(['_config', '_default', '_section', 'options'])
b93dc49210 2009-10-13 217: _default = {
fc934cead1 2009-10-13 218: 'log': {
fc934cead1 2009-10-13 219: 'silent': 'no',
fc934cead1 2009-10-13 220: },
fc934cead1 2009-10-13 221: 'database': {
fc934cead1 2009-10-13 222: 'host': 'localhost',
fc934cead1 2009-10-13 223: 'database': 'squidTag',
fc934cead1 2009-10-13 224: },}
88c03b5440 2009-10-09 225:
fc934cead1 2009-10-13 226: # function to read in config file
88c03b5440 2009-10-09 227: def __init__(self):
0ef24b1937 2011-04-06 228: import ConfigParser, optparse, os
ae30851739 2010-08-12 229:
d500448801 2009-10-05 230: parser = optparse.OptionParser()
d500448801 2009-10-05 231: parser.add_option('-c', '--config', dest = 'config',
d500448801 2009-10-05 232: help = 'config file location', metavar = 'FILE',
d500448801 2009-10-05 233: default = '/usr/local/etc/squid-tagger.conf')
ae30851739 2010-08-12 234: parser.add_option('-d', '--dump', dest = 'dump',
ae30851739 2010-08-12 235: help = 'dump database', action = 'store_true', metavar = 'bool',
ae30851739 2010-08-12 236: default = False)
31e69c4237 2010-08-12 237: parser.add_option('-f', '--flush-database', dest = 'flush_db',
31e69c4237 2010-08-12 238: help = 'flush previous database on load', default = False,
31e69c4237 2010-08-12 239: action = 'store_true', metavar = 'bool')
31e69c4237 2010-08-12 240: parser.add_option('-l', '--load', dest = 'load',
31e69c4237 2010-08-12 241: help = 'load database', action = 'store_true', metavar = 'bool',
31e69c4237 2010-08-12 242: default = False)
d301d9adc6 2010-08-13 243: parser.add_option('-D', '--dump-conf', dest = 'dump_conf',
d301d9adc6 2010-08-13 244: help = 'dump filtering rules', default = False, metavar = 'bool',
d301d9adc6 2010-08-13 245: action = 'store_true')
d301d9adc6 2010-08-13 246: parser.add_option('-L', '--load-conf', dest = 'load_conf',
d301d9adc6 2010-08-13 247: help = 'load filtering rules', default = False, metavar = 'bool',
d301d9adc6 2010-08-13 248: action = 'store_true')
7c13294e9f 2010-08-07 249:
ae30851739 2010-08-12 250: (self.options, args) = parser.parse_args()
7c13294e9f 2010-08-07 251:
ae30851739 2010-08-12 252: assert os.access(self.options.config, os.R_OK), "Fatal error: can't read {}".format(self.options.config)
7c13294e9f 2010-08-07 253:
0ef24b1937 2011-04-06 254: self._config = ConfigParser.ConfigParser()
ae30851739 2010-08-12 255: self._config.readfp(open(self.options.config))
fc934cead1 2009-10-13 256:
fc934cead1 2009-10-13 257: # function to select config file section or create one
d500448801 2009-10-05 258: def section(self, section):
fc934cead1 2009-10-13 259: if not self._config.has_section(section):
fc934cead1 2009-10-13 260: self._config.add_section(section)
d500448801 2009-10-05 261: self._section = section
d500448801 2009-10-05 262:
fc934cead1 2009-10-13 263: # function to get config parameter, if parameter doesn't exists the default
fc934cead1 2009-10-13 264: # value or None is substituted
d500448801 2009-10-05 265: def __getitem__(self, name):
fc934cead1 2009-10-13 266: if not self._config.has_option(self._section, name):
b93dc49210 2009-10-13 267: if self._section in self._default:
b93dc49210 2009-10-13 268: if name in self._default[self._section]:
fc934cead1 2009-10-13 269: self._config.set(self._section, name, self._default[self._section][name])
fc934cead1 2009-10-13 270: else:
fc934cead1 2009-10-13 271: self._config.set(self._section, name, None)
fc934cead1 2009-10-13 272: else:
fc934cead1 2009-10-13 273: self._config.set(self._section, name, None)
b93dc49210 2009-10-13 274: return(self._config.get(self._section, name))
fc934cead1 2009-10-13 275:
fc934cead1 2009-10-13 276: # initializing and reading in config file
fc934cead1 2009-10-13 277: config = Config()
fc934cead1 2009-10-13 278:
d301d9adc6 2010-08-13 279: if config.options.dump or config.options.load or config.options.dump_conf or config.options.load_conf:
d301d9adc6 2010-08-13 280: import csv
d301d9adc6 2010-08-13 281:
d301d9adc6 2010-08-13 282: tagdb = tagDB()
bde51dc0c7 2010-08-26 283: data_fields = ['site', 'tag', 'regexp']
d301d9adc6 2010-08-13 284: conf_fields = ['netmask', 'redirect_url', 'from_weekday', 'to_weekday', 'from_time', 'to_time', 'tag']
d301d9adc6 2010-08-13 285:
d301d9adc6 2010-08-13 286: if config.options.dump or config.options.dump_conf:
0ef24b1937 2011-04-06 287: csv_writer = csv.writer(sys.stdout)
d301d9adc6 2010-08-13 288: if config.options.dump:
bde51dc0c7 2010-08-26 289: dump = tagdb.dump()
bde51dc0c7 2010-08-26 290: elif config.options.dump_conf:
bde51dc0c7 2010-08-26 291: dump = tagdb.dump_conf()
bde51dc0c7 2010-08-26 292:
0ef24b1937 2011-04-06 293: csv_writer.writerow(dump[0])
0ef24b1937 2011-04-06 294: for line in dump[1]:
0ef24b1937 2011-04-06 295: csv_writer.writerow(line)
d301d9adc6 2010-08-13 296:
d301d9adc6 2010-08-13 297: elif config.options.load or config.options.load_conf:
d301d9adc6 2010-08-13 298: csv_reader = csv.reader(sys.stdin)
d301d9adc6 2010-08-13 299: first_row = next(csv_reader)
d301d9adc6 2010-08-13 300:
d301d9adc6 2010-08-13 301: if config.options.load:
bde51dc0c7 2010-08-26 302: fields = data_fields
bde51dc0c7 2010-08-26 303: load = tagdb.load
bde51dc0c7 2010-08-26 304: elif config.options.load_conf:
bde51dc0c7 2010-08-26 305: fields = conf_fields
bde51dc0c7 2010-08-26 306: load = tagdb.load_conf
bde51dc0c7 2010-08-26 307:
bde51dc0c7 2010-08-26 308: assert first_row == fields, 'File must contain csv data with theese columns: ' + repr(fields)
bde51dc0c7 2010-08-26 309: load(csv_reader)
d301d9adc6 2010-08-13 310:
d301d9adc6 2010-08-13 311: else:
d301d9adc6 2010-08-13 312: # main loop
0ef24b1937 2011-04-06 313: Checker(stdin).loop()