Overview
Comment: | Finally reactor support. Plain and threaded are tested and working good. Kqueue not ready for use. |
---|---|
Downloads: | Tarball | ZIP archive | SQL archive |
Timelines: | family | ancestors | descendants | both | master | trunk |
Files: | files | file ages | folders |
SHA3-256: |
ed7808827d82157d37343048186ba5c5 |
User & Date: | c.kworr@d4daf22a-8aaf-11de-a64d-234b64dd91b4 on 2009-10-14 14:16:17.000 |
Other Links: | branch diff | manifest | tags |
Context
2009-10-27
| ||
15:27 | now mark function returns id of site added check-in: 318311c7d2 user: c.kworr@d4daf22a-8aaf-11de-a64d-234b64dd91b4 tags: master, trunk | |
2009-10-14
| ||
14:16 | Finally reactor support. Plain and threaded are tested and working good. Kqueue not ready for use. check-in: ed7808827d user: c.kworr@d4daf22a-8aaf-11de-a64d-234b64dd91b4 tags: master, trunk | |
09:04 | Added 'or replace' for better function updating, reordered funtions to work as found in file. check-in: 67e762b39b user: c.kworr@d4daf22a-8aaf-11de-a64d-234b64dd91b4 tags: master, trunk | |
Changes
Modified squid-tagger.conf
from [3b38389a60]
to [cab21d427f].
︙ | ︙ | |||
16 17 18 19 20 21 22 | password = password # This section can be used to turn off some logging. [log] # There would be no logs generated if 'silent' is set to 'yes'. silent = no | > > > > > > | 16 17 18 19 20 21 22 23 24 25 26 27 28 | password = password # This section can be used to turn off some logging. [log] # There would be no logs generated if 'silent' is set to 'yes'. silent = no # This section control reactor wich would service main loop. [reactor] # Possible reactor types: plain, thread and kqueue (broken) reactor = thread |
Modified squid-tagger.py
from [b2dec98d3a]
to [1d253fe455].
︙ | ︙ | |||
45 46 47 48 49 50 51 | return(self._db) def check(self, site, ip_address): return(self._check_stmt(site, ip_address)) # abstract class with basic checking functionality class Checker: | | | | 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 | return(self._db) def check(self, site, ip_address): return(self._check_stmt(site, ip_address)) # abstract class with basic checking functionality class Checker: __slots__ = frozenset(['_db', '_log']) def __init__(self): self._db = tagDB() self._log = Logger() def process(self, id, site, ip_address, url_path, line = None): self._log.info('trying {}\n'.format(site)) result = self._db.check(site, ip_address) #reply = '{}://{}/{}'.format(req[4], req[1], req[3]) reply = '-' for row in result: if row != None and row[0] != None: if row[1] != None: |
︙ | ︙ | |||
78 79 80 81 82 83 84 | request = re.compile('^([0-9]+)\ (http|ftp):\/\/([-\w.:]+)\/([^ ]*)\ ([0-9.]+)\/(-|[\w\.]+)\ (-|\w+)\ (-|GET|HEAD|POST).*$').match(line) if request: id = request.group(1) #proto = request.group(2) site = request.group(3) url_path = request.group(4) ip_address = request.group(5) | | < < < < < > > > > > > > | > > > | | | > | | > | | > > | | | > > > | | > > > > > > > | > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 | request = re.compile('^([0-9]+)\ (http|ftp):\/\/([-\w.:]+)\/([^ ]*)\ ([0-9.]+)\/(-|[\w\.]+)\ (-|\w+)\ (-|GET|HEAD|POST).*$').match(line) if request: id = request.group(1) #proto = request.group(2) site = request.group(3) url_path = request.group(4) ip_address = request.group(5) self.process(id, site, ip_address, url_path, line) else: self._log.info('bad request\n') self.writeline(line) def writeline(self, string): self._log.info('sending: ' + string) sys.stdout.write(string) sys.stdout.flush() def loop(self): while True: line = sys.stdin.readline() if len(line) == 0: break self.check(line) # threaded checking facility class CheckerThread(Checker): __slots__ = frozenset(['_lock', '_lock_exit', '_lock_queue', '_queue']) def __init__(self): # basic initialisation Checker.__init__(self) # Spin lock. Loop acquires it on start then releases it when holding queue # lock. This way the thread proceeds without stops while queue has data and # gets stalled when no data present. The lock is released by queue writer # after storing something into the queue self._lock = _thread.allocate_lock() self._lock_exit = _thread.allocate_lock() self._lock_queue = _thread.allocate_lock() self._lock.acquire() self._queue = [] _thread.start_new_thread(self._start, ()) def _start(self): while True: self._lock.acquire() with self._lock_queue: # yes this should be written this way, and yes, this is why I hate threading if len(self._queue) > 1: if self._lock.locked(): self._lock.release() req = self._queue.pop(0) Checker.process(self, req[0], req[1], req[2], req[3]) with self._lock_queue: if len(self._queue) == 0: if self._lock_exit.locked(): self._lock_exit.release() def process(self, id, site, ip_address, url_path, line): with self._lock_queue: self._queue.append((id, site, ip_address, url_path)) self._log.info('request {} queued ({})\n'.format(id, line)) if not self._lock_exit.locked(): self._lock_exit.acquire() if self._lock.locked(): self._lock.release() def loop(self): while True: line = sys.stdin.readline() if len(line) == 0: break self.check(line) self._lock_exit.acquire() # kqueue enable class for BSD's XXX broken for now class CheckerKqueue(Checker): __slots__ = frozenset(['_kq', '_select', '_queue']) def __init__(self): # basic initialisation Checker.__init__(self) # importing select module import select self._select = select # kreating kqueue self._kq = self._select.kqueue() assert (self._kq.fileno() != -1) # watching sys.stdin for data self._kq.control([self._select.kevent(sys.stdin, self._select.KQ_FILTER_READ, self._select.KQ_EV_ADD)], 0) # creating data queue self._queue = [] def loop(self): # Wait for data by default timeout = None while True: # checking if there is any data kevs = self._kq.control(None, 1, timeout) if len(kevs) > 0: #kev = kevs[0] # XXX add some code to read only known data size and check for newlines line = sys.stdin.readline() # add data to the queue self.check(line) # don't wait for data, start processing timeout = 0 else: req = self._queue.pop(0) Checker.process(self, req[0], req[1], req[2], req[3]) if len(self._queue) == 0: # wait for data - we have nothing to process timeout = None def process(self, id, site, ip_address, url_path, line): self._queue.append((id, site, ip_address, url_path)) self._log.info('request {} queued ({})\n'.format(id, line)) # this classes processes config file and substitutes default values class Config: __slots__ = frozenset(['_config', '_default', '_section']) _default = { 'reactor': { 'reactor': 'thread', |
︙ | ︙ | |||
183 184 185 186 187 188 189 190 | # initializing and reading in config file config = Config() config.section('reactor') if config['reactor'] == 'thread': checker = CheckerThread() | > > > > < < < < | | 249 250 251 252 253 254 255 256 257 258 259 260 261 | # initializing and reading in config file config = Config() config.section('reactor') if config['reactor'] == 'thread': checker = CheckerThread() elif config['reactor'] == 'plain': checker = Checker() elif config['reactor'] == 'kqueue': checker = CheckerKqueue() checker.loop() |