135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
|
handler.setFormatter(logging.Formatter(str('squidTag[%(process)s]: %(message)s')))
logger.addHandler(handler)
# tiny wrapper around a file to make reads from it geventable
# or should i move this somewhere?
class FReadlineQueue(gevent.queue.Queue):
# storing file descriptor, leftover
__slots__ = frozenset(['_io', '_fileno', '_tail'])
def __init__(self, fd, closefd = True):
import io
# initialising class
gevent.queue.Queue.__init__(self)
# storing file descriptor
self._fileno = fd.fileno()
self._io = io.FileIO(self._fileno, 'r', closefd)
# using empty tail
self._tail = ''
# setting up event
self._install_wait()
def _install_wait(self):
# putting file to nonblocking mode
fcntl.fcntl(self._fileno, fcntl.F_SETFL, fcntl.fcntl(self._fileno, fcntl.F_GETFL) | os.O_NONBLOCK)
# installing event handler
gevent.core.read_event(self._fileno, self._wait_helper)
def _wait_helper(self, ev, evtype):
# reading one buffer from stream
buf = self._io.read(4096)
# splitting stream by line ends
rows = buf.decode('l1').split('\n')
# adding tail to the first element if there is some tail
if len(self._tail) > 0:
rows[0] = self._tail + rows[0]
# popping out last (incomplete) element
self._tail = rows.pop(-1)
# dropping all complete elements to the queue
for row in rows:
self.put_nowait(row)
logger.info('< ' + row)
if len(buf) > 0:
# no EOF, reinstalling event handler
gevent.core.read_event(self._fileno, self._wait_helper)
else:
# EOF found, sending EOF to queue
self.put_nowait(None)
stdin = FReadlineQueue(sys.stdin, False)
# wrapper against file handler that makes possible to queue some writes without stalling
class FWritelineQueue(gevent.queue.JoinableQueue):
# storing fileno, io interface, leftover
__slots__ = frozenset(['_fileno', '_io', '_tail'])
def __init__(self, fd, closefd = True):
import io
# initialising class
gevent.queue.JoinableQueue.__init__(self)
# storing fileno
self._fileno = fd.fileno()
# creating interface
self._io = io.FileIO(self._fileno, 'w', closefd)
# using empty tail
self._tail = None
# putting file to nonblocking mode
fcntl.fcntl(self._fileno, fcntl.F_SETFL, fcntl.fcntl(self._fileno, fcntl.F_GETFL) | os.O_NONBLOCK)
def __del__(self):
# purge queue before deleting
if not self.empty():
self.join()
def put(self, item, block=True, timeout=None):
# calling real put
gevent.queue.JoinableQueue.put(self, item, block, timeout)
# installing event handler
gevent.core.write_event(self._fileno, self._wait_helper)
def _wait_helper(self, ev, evtype):
# XXX ev, evtype checking?
# checking leftover
while True:
if self._tail == None:
try:
self._tail = str(self.get_nowait()).encode('utf-8') + '\n'
except gevent.queue.Empty:
self._tail = None
return
# writing tail
written = self._io.write(self._tail)
length = len(self._tail)
if written == length:
self._tail = None
elif written < length:
self._tail = self._tail[written:]
break
else:
break
# reinstalling event handler
gevent.core.write_event(self._fileno, self._wait_helper)
# wrapper around database
class tagDB(object):
__slots__ = frozenset(['_cursor', '_db'])
def __init__(self):
config.section('database')
|
|
|
|
<
<
|
<
<
<
<
<
|
|
|
|
>
|
>
>
>
|
|
|
|
|
|
|
|
|
|
|
|
<
<
<
<
|
|
|
|
|
|
<
|
|
|
<
<
|
|
|
<
|
<
<
<
<
<
|
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
|
handler.setFormatter(logging.Formatter(str('squidTag[%(process)s]: %(message)s')))
logger.addHandler(handler)
# tiny wrapper around a file to make reads from it geventable
# or should i move this somewhere?
class FReadlineQueue(gevent.queue.Queue):
# storing fileno descriptor, leftover
__slots__ = frozenset(['_fn', '_tail'])
def __init__(self, fd):
# initialising class
gevent.queue.Queue.__init__(self)
self._fn = fd.fileno()
# using empty tail
self._tail = ''
# putting file to nonblocking mode
gevent.os.make_nonblocking(fd)
# starting main loop
gevent.spawn(self._frobber)
def _frobber(self):
while True:
# reading one buffer from stream
buf = gevent.os.nb_read(self._fn, 4096)
# EOF found
if len(buf) == 0:
break
# splitting stream by line ends
rows = buf.decode('l1').split('\n')
# adding tail to the first element if there is some tail
if len(self._tail) > 0:
rows[0] = self._tail + rows[0]
# popping out last (incomplete) element
self._tail = rows.pop(-1)
# dropping all complete elements to the queue
for row in rows:
self.put_nowait(row)
logger.info('< ' + row)
# sending EOF
self.put_nowait(None)
stdin = FReadlineQueue(sys.stdin)
# wrapper against file handler that makes possible to queue some writes without stalling
class FWritelineQueue(gevent.queue.JoinableQueue):
# storing fileno, leftover
__slots__ = frozenset(['_fn', '_tail'])
def __init__(self, fd):
# initialising class
gevent.queue.JoinableQueue.__init__(self)
# storing fileno
self._fn = fd.fileno()
# putting file to nonblocking mode
gevent.os.make_nonblocking(fd)
# using empty tail
self._tail = None
def __del__(self):
# purge queue before deleting
if not self.empty():
self.join()
def put(self, item, block=True, timeout=None):
# calling real put
gevent.queue.JoinableQueue.put(self, item, block, timeout)
# starting main loop
gevent.spawn(self._frobber)
def _frobber(self):
# checking leftover
while True:
if self._tail == None:
try:
self._tail = str(self.get_nowait()).encode('utf-8') + '\n'
except gevent.queue.Empty:
self._tail = None
return
# writing tail
written = gevent.os.nb_write(self._fn, self._tail)
length = len(self._tail)
if written == length:
self._tail = None
elif written < length:
self._tail = self._tail[written:]
# wrapper around database
class tagDB(object):
__slots__ = frozenset(['_cursor', '_db'])
def __init__(self):
config.section('database')
|
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
|
def __init__(self, queue, logger):
self._db = tagDB()
self._log = logger
self._log.info('started')
self._request = re.compile('^([0-9]+)\ ((http|ftp):\/\/)?([-\w.]+)(:[0-9]+)?(\/([^ ]*))?\ ([0-9.:]+)\/(-|[\w\.]+)\ (-|\w+)\ (-|GET|HEAD|POST|CONNECT).*$')
self._queue = queue
self._stdout = FWritelineQueue(sys.stdout, False)
def process(self, id, site, ip_address, url_path, line = None):
#self._log.info('trying {}'.format(site))
result = self._db.check(site, ip_address)
reply = None
#self._log.info('got {} lines from database'.format(len(result)))
for row in result:
|
|
|
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
|
def __init__(self, queue, logger):
self._db = tagDB()
self._log = logger
self._log.info('started')
self._request = re.compile('^([0-9]+)\ ((http|ftp):\/\/)?([-\w.]+)(:[0-9]+)?(\/([^ ]*))?\ ([0-9.:]+)\/(-|[\w\.]+)\ (-|\w+)\ (-|GET|HEAD|POST|CONNECT).*$')
self._queue = queue
self._stdout = FWritelineQueue(sys.stdout)
def process(self, id, site, ip_address, url_path, line = None):
#self._log.info('trying {}'.format(site))
result = self._db.check(site, ip_address)
reply = None
#self._log.info('got {} lines from database'.format(len(result)))
for row in result:
|