Overview
Comment: | back to python2.7 moving to gevent |
---|---|
Downloads: | Tarball | ZIP archive | SQL archive |
Timelines: | family | ancestors | descendants | both | master | trunk |
Files: | files | file ages | folders |
SHA3-256: |
0ef24b1937f5e72873cdaae037ffb6b0 |
User & Date: | c.kworr@d4daf22a-8aaf-11de-a64d-234b64dd91b4 on 2011-04-06 13:26:44.000 |
Other Links: | branch diff | manifest | tags |
Context
2011-06-05
| ||
22:38 | better logging deinternaliation of requests shrinking list of matching entries check-in: 39b97ced92 user: c.kworr@d4daf22a-8aaf-11de-a64d-234b64dd91b4 tags: master, trunk | |
2011-04-06
| ||
13:26 | back to python2.7 moving to gevent check-in: 0ef24b1937 user: c.kworr@d4daf22a-8aaf-11de-a64d-234b64dd91b4 tags: master, trunk | |
2010-12-07
| ||
15:49 | individual ip support fixed check-in: a3d53162db user: c.kworr@d4daf22a-8aaf-11de-a64d-234b64dd91b4 tags: master, trunk | |
Changes
Modified squid-tagger.py
from [1fb6da0d80]
to [64da6cdc94].
|
| | > | > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | | | | | | | | > | | > > | | | | > > < | | | > < | | < > | | | | | > | | < | < > | | > | | > | | | > | | > > > > > | | | | | | | | < | | | < < < < < < | < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < | < | < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < | | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 | #!/usr/bin/env python from __future__ import division, print_function, unicode_literals import gevent.monkey gevent.monkey.patch_all() import fcntl, gevent.core, gevent.pool, gevent.queue, gevent.socket, os, psycopg2, re, sys # //inclusion start # Copyright (C) 2010 Daniele Varrazzo <daniele.varrazzo@gmail.com> # and licensed under the MIT license: def gevent_wait_callback(conn, timeout=None): """A wait callback useful to allow gevent to work with Psycopg.""" while 1: state = conn.poll() if state == psycopg2.extensions.POLL_OK: break elif state == psycopg2.extensions.POLL_READ: gevent.socket.wait_read(conn.fileno(), timeout=timeout) elif state == psycopg2.extensions.POLL_WRITE: gevent.socket.wait_write(conn.fileno(), timeout=timeout) else: raise psycopg2.OperationalError("Bad result from poll: %r" % state) if not hasattr(psycopg2.extensions, 'set_wait_callback'): raise ImportError("support for coroutines not available in this Psycopg version (%s)" % psycopg2.__version__) psycopg2.extensions.set_wait_callback(gevent_wait_callback) # //inclusion end # tiny wrapper around a file to make reads from it geventable # or should i move this somewhere? class FReadlineQueue(gevent.queue.Queue): # storing file descriptor, leftover __slots__ = frozenset(['_fd', '_tail']) def __init__(self, fd): # initialising class gevent.queue.Queue.__init__(self) # storing file descriptor self._fd = fd # using empty tail self._tail = '' # setting up event self._install_wait() def _install_wait(self): fileno = self._fd.fileno() # putting file to nonblocking mode fcntl.fcntl(fileno, fcntl.F_SETFL, fcntl.fcntl(fileno, fcntl.F_GETFL) | os.O_NONBLOCK) # installing event handler gevent.core.read_event(fileno, self._wait_helper) def _wait_helper(self, ev, evtype): # reading one buffer from stream buf = self._fd.read(4096) # splitting stream by line ends rows = buf.split('\n') # adding tail to the first element if there is some tail if len(self._tail) > 0: rows[0] = self._tail + rows[0] # popping out last (incomplete) element self._tail = rows.pop(-1) # dropping all complete elements to the queue for row in rows: self.put_nowait(row) if len(buf) > 0: # no EOF, reinstalling event handler gevent.core.read_event(self._fd.fileno(), self._wait_helper) else: # EOF found, sending EOF to queue self.put_nowait(None) stdin = FReadlineQueue(sys.stdin) # wrapper around syslog, can be muted class Logger(object): __slots__ = frozenset(['_syslog']) def __init__(self): config.section('log') if config['silent'] == 'yes': self._syslog = None else: import syslog self._syslog = syslog self._syslog.openlog(str('squidTag')) def info(self, message): if self._syslog != None: self._syslog.syslog(self._syslog.LOG_INFO, message) def notice(self, message): if self._syslog != None: self._syslog.syslog(self._syslog.LOG_NOTICE, message) # wrapper around database class tagDB(object): __slots__ = frozenset(['_cursor', '_db']) def __init__(self): config.section('database') self._db = psycopg2.connect( database = config['database'], host = config['host'], user = config['user'], password = config['password'], ) self._cursor = self._db.cursor() def _field_names(self): names = [] for record in self._cursor.description: names.append(record.name) return(names) def check(self, site, ip_address): self._cursor.execute("select redirect_url, regexp from site_rule where site <@ tripdomain(%s) and netmask >>= %s order by array_length(site, 1) desc", [site, ip_address]) return(self._cursor.fetchall()) def dump(self): self._cursor.execute("select untrip(site) as site, tag::text, regexp from urls order by site, tag") return(self._field_names(), self._cursor.fetchall()) def load(self, data): if config.options.flush_db: self._cursor.execute('delete from urls;') bundle = [] for row in data: if len(row) == 2: bundle.append([row[0], row[1], None]) else: bundle.append([row[0], row[1], row[2]]) self._cursor.executemany("insert into urls (site, tag, regexp) values (tripdomain(%s), %s, %s)", bundle) self._cursor.execute("update urls set regexp = NULL where regexp = ''") self._db.commit() def load_conf(self, csv_data): self._cursor.execute('delete from rules;') bundle = [] for row in csv_data: bundle.append([row[0], row[1], int(row[2]), int(row[3]), row[4], row[5], row[6]]) self._cursor.executemany("insert into rules (netmask, redirect_url, from_weekday, to_weekday, from_time, to_time, tag) values (%s::text::cidr, %s, %s, %s, %s::text::time, %s::text::time, %s::text::text[])", bundle) self._db.commit() def dump_conf(self): self._cursor.execute("select netmask, redirect_url, from_weekday, to_weekday, from_time, to_time, tag::text from rules") return(self._field_names(), self._cursor.fetchall()) # abstract class with basic checking functionality class Checker(object): __slots__ = frozenset(['_db', '_log', '_queue', '_request']) def __init__(self, queue): self._db = tagDB() self._log = Logger() self._log.info('started\n') self._request = re.compile('^([0-9]+)\ (http|ftp):\/\/([-\w.:]+)\/([^ ]*)\ ([0-9.]+)\/(-|[\w\.]+)\ (-|\w+)\ (-|GET|HEAD|POST).*$') self._queue = queue def process(self, id, site, ip_address, url_path, line = None): #self._log.info('trying {}\n'.format(site)) result = self._db.check(site, ip_address) reply = None for row in result: if row != None and row[0] != None: if row[1] != None: self._log.info('trying regexp "{}" versus "{}"\n'.format(row[1], url_path)) try: if re.compile(row[1]).match(url_path): reply = row[0].format(url_path) else: continue except: self._log.info("can't compile regexp") else: reply = row[0].format(url_path) if reply != None: self.writeline('{} {}\n'.format(id, reply)) return(True) self.writeline('{}\n'.format(id)) def check(self): while True: line = self._queue.get() if line == None: break self._log.info('request: ' + line) request = self._request.match(line) if request: id = request.group(1) #proto = request.group(2) site = request.group(3) url_path = request.group(4) ip_address = request.group(5) self.process(id, site, ip_address, url_path, line) else: self._log.info('bad request\n') self.writeline(line + '\n') def writeline(self, string): self._log.info('sending: ' + string) sys.stdout.write(string) sys.stdout.flush() def loop(self): pool = gevent.pool.Pool() pool.spawn(self.check) pool.join() # this classes processes config file and substitutes default values class Config: __slots__ = frozenset(['_config', '_default', '_section', 'options']) _default = { 'log': { 'silent': 'no', }, 'database': { 'host': 'localhost', 'database': 'squidTag', },} # function to read in config file def __init__(self): import ConfigParser, optparse, os parser = optparse.OptionParser() parser.add_option('-c', '--config', dest = 'config', help = 'config file location', metavar = 'FILE', default = '/usr/local/etc/squid-tagger.conf') parser.add_option('-d', '--dump', dest = 'dump', help = 'dump database', action = 'store_true', metavar = 'bool', |
︙ | ︙ | |||
298 299 300 301 302 303 304 | help = 'load filtering rules', default = False, metavar = 'bool', action = 'store_true') (self.options, args) = parser.parse_args() assert os.access(self.options.config, os.R_OK), "Fatal error: can't read {}".format(self.options.config) | | | 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 | help = 'load filtering rules', default = False, metavar = 'bool', action = 'store_true') (self.options, args) = parser.parse_args() assert os.access(self.options.config, os.R_OK), "Fatal error: can't read {}".format(self.options.config) self._config = ConfigParser.ConfigParser() self._config.readfp(open(self.options.config)) # function to select config file section or create one def section(self, section): if not self._config.has_section(section): self._config.add_section(section) self._section = section |
︙ | ︙ | |||
331 332 333 334 335 336 337 338 339 340 341 342 | import csv tagdb = tagDB() data_fields = ['site', 'tag', 'regexp'] conf_fields = ['netmask', 'redirect_url', 'from_weekday', 'to_weekday', 'from_time', 'to_time', 'tag'] if config.options.dump or config.options.dump_conf: if config.options.dump: dump = tagdb.dump() elif config.options.dump_conf: dump = tagdb.dump_conf() | > > | | < < < < | < < < < | 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 | import csv tagdb = tagDB() data_fields = ['site', 'tag', 'regexp'] conf_fields = ['netmask', 'redirect_url', 'from_weekday', 'to_weekday', 'from_time', 'to_time', 'tag'] if config.options.dump or config.options.dump_conf: csv_writer = csv.writer(sys.stdout) if config.options.dump: dump = tagdb.dump() elif config.options.dump_conf: dump = tagdb.dump_conf() csv_writer.writerow(dump[0]) for line in dump[1]: csv_writer.writerow(line) elif config.options.load or config.options.load_conf: csv_reader = csv.reader(sys.stdin) first_row = next(csv_reader) if config.options.load: fields = data_fields load = tagdb.load elif config.options.load_conf: fields = conf_fields load = tagdb.load_conf assert first_row == fields, 'File must contain csv data with theese columns: ' + repr(fields) load(csv_reader) else: # main loop Checker(stdin).loop() |