Squid url redirector

Check-in [ed7808827d]
anonymous

Check-in [ed7808827d]

Overview
Comment:Finally reactor support. Plain and threaded are tested and working good. Kqueue not ready for use.
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | master | trunk
Files: files | file ages | folders
SHA3-256: ed7808827d82157d37343048186ba5c5c0d33b3d72f4355f424c17cf822f8072
User & Date: c.kworr@d4daf22a-8aaf-11de-a64d-234b64dd91b4 on 2009-10-14 14:16:17.000
Other Links: branch diff | manifest | tags
Context
2009-10-27
15:27
now mark function returns id of site added check-in: 318311c7d2 user: c.kworr@d4daf22a-8aaf-11de-a64d-234b64dd91b4 tags: master, trunk
2009-10-14
14:16
Finally reactor support. Plain and threaded are tested and working good. Kqueue not ready for use. check-in: ed7808827d user: c.kworr@d4daf22a-8aaf-11de-a64d-234b64dd91b4 tags: master, trunk
09:04
Added 'or replace' for better function updating, reordered funtions to work as found in file. check-in: 67e762b39b user: c.kworr@d4daf22a-8aaf-11de-a64d-234b64dd91b4 tags: master, trunk
Changes
16
17
18
19
20
21
22






16
17
18
19
20
21
22
23
24
25
26
27
28







+
+
+
+
+
+
password = password

# This section can be used to turn off some logging.
[log]

# There would be no logs generated if 'silent' is set to 'yes'.
silent = no

# This section control reactor wich would service main loop.
[reactor]

# Possible reactor types: plain, thread and kqueue (broken)
reactor = thread
45
46
47
48
49
50
51
52

53
54
55
56
57
58

59
60
61
62
63
64
65
45
46
47
48
49
50
51

52
53
54
55
56
57

58
59
60
61
62
63
64
65







-
+





-
+







		return(self._db)

	def check(self, site, ip_address):
		return(self._check_stmt(site, ip_address))

# abstract class with basic checking functionality
class Checker:
	__slots__ = frozenset(['_db', '_log', '_queue'])
	__slots__ = frozenset(['_db', '_log'])

	def __init__(self):
		self._db = tagDB()
		self._log = Logger()

	def process(self, id, site, ip_address, url_path):
	def process(self, id, site, ip_address, url_path, line = None):
		self._log.info('trying {}\n'.format(site))
		result = self._db.check(site, ip_address)
		#reply = '{}://{}/{}'.format(req[4], req[1], req[3])
		reply = '-'
		for row in result:
			if row != None and row[0] != None:
				if row[1] != None:
78
79
80
81
82
83
84
85

86
87
88
89
90
91
92
93
94
95
96
97
98
99







100
101
102

103
104

105

106
107
108
109
110

111
112
113
114
115
116
117
118
119
120
121
122
123
124
125











126
127
128
129
130
131
132































































133
134
135
136
137
138
139
78
79
80
81
82
83
84

85


86
87
88
89



90
91
92
93
94
95
96
97
98
99
100
101
102
103

104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123







124
125
126
127
128
129
130
131
132
133
134
135






136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205







-
+
-
-




-
-
-





+
+
+
+
+
+
+


-
+


+

+





+








-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+

-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+







		request = re.compile('^([0-9]+)\ (http|ftp):\/\/([-\w.:]+)\/([^ ]*)\ ([0-9.]+)\/(-|[\w\.]+)\ (-|\w+)\ (-|GET|HEAD|POST).*$').match(line)
		if request:
			id = request.group(1)
			#proto = request.group(2)
			site = request.group(3)
			url_path = request.group(4)
			ip_address = request.group(5)
			self.insert(id, site, ip_address, url_path)
			self.process(id, site, ip_address, url_path, line)

			self._log.info('request {} queued ({})\n'.format(id, line))
		else:
			self._log.info('bad request\n')
			self.writeline(line)

	def insert(self, id, site, ip_address, url_path):
		self._queue.append((id, site, ip_address, url_path))

	def writeline(self, string):
		self._log.info('sending: ' + string)
		sys.stdout.write(string)
		sys.stdout.flush()

	def loop(self):
		while True:
			line = sys.stdin.readline()
			if len(line) == 0:
				break
			self.check(line)

# threaded checking facility
class CheckerThread(Checker):
	__slots__ = frozenset(['_lock', '_lock_queue'])
	__slots__ = frozenset(['_lock', '_lock_exit', '_lock_queue', '_queue'])

	def __init__(self):
		# basic initialisation
		Checker.__init__(self)

		# Spin lock. Loop acquires it on start then releases it when holding queue
		# lock. This way the thread proceeds without stops while queue has data and
		# gets stalled when no data present. The lock is released by queue writer
		# after storing something into the queue
		self._lock = _thread.allocate_lock()
		self._lock_exit = _thread.allocate_lock()
		self._lock_queue = _thread.allocate_lock()
		self._lock.acquire()
		self._queue = []
		_thread.start_new_thread(self._start, ())

	def _start(self):
		while True:
			self._lock.acquire()
			self._lock_queue.acquire()
			# yes this should be written this way, and yes, this is why I hate threading
			if len(self._queue) > 1 and self._lock.locked():
				self._lock.release()
			req = self._queue.pop(0)
			self._lock_queue.release()
			self.process(req[0], req[1], req[2], req[3])
			with self._lock_queue:
				# yes this should be written this way, and yes, this is why I hate threading
				if len(self._queue) > 1:
					if self._lock.locked():
						self._lock.release()
				req = self._queue.pop(0)
			Checker.process(self, req[0], req[1], req[2], req[3])
			with self._lock_queue:
				if len(self._queue) == 0:
					if self._lock_exit.locked():
						self._lock_exit.release()

	def insert(self, id, site, ip_address, url_path):
		self._lock_queue.acquire()
		Checker.insert(self, id, site, ip_address, url_path)
		if self._lock.locked():
			self._lock.release()
		self._lock_queue.release()
	def process(self, id, site, ip_address, url_path, line):
		with self._lock_queue:
			self._queue.append((id, site, ip_address, url_path))
			self._log.info('request {} queued ({})\n'.format(id, line))
			if not self._lock_exit.locked():
				self._lock_exit.acquire()
			if self._lock.locked():
				self._lock.release()

	def loop(self):
		while True:
			line = sys.stdin.readline()
			if len(line) == 0:
				break
			self.check(line)
		self._lock_exit.acquire()

# kqueue enable class for BSD's XXX broken for now
class CheckerKqueue(Checker):
	__slots__ = frozenset(['_kq', '_select', '_queue'])

	def __init__(self):
		# basic initialisation
		Checker.__init__(self)

		# importing select module
		import select
		self._select = select

		# kreating kqueue
		self._kq = self._select.kqueue()
		assert (self._kq.fileno() != -1)

		# watching sys.stdin for data
		self._kq.control([self._select.kevent(sys.stdin, self._select.KQ_FILTER_READ, self._select.KQ_EV_ADD)], 0)

		# creating data queue
		self._queue = []

	def loop(self):
		# Wait for data by default
		timeout = None
		while True:
			# checking if there is any data
			kevs = self._kq.control(None, 1, timeout)
			if len(kevs) > 0:
				#kev = kevs[0]
				# XXX add some code to read only known data size and check for newlines
				line = sys.stdin.readline()
				# add data to the queue
				self.check(line)
				# don't wait for data, start processing
				timeout = 0
			else:
				req = self._queue.pop(0)
				Checker.process(self, req[0], req[1], req[2], req[3])
				if len(self._queue) == 0:
					# wait for data - we have nothing to process
					timeout = None

	def process(self, id, site, ip_address, url_path, line):
		self._queue.append((id, site, ip_address, url_path))
		self._log.info('request {} queued ({})\n'.format(id, line))

# this classes processes config file and substitutes default values
class Config:
	__slots__ = frozenset(['_config', '_default', '_section'])
	_default = {
		'reactor': {
			'reactor': 'thread',
183
184
185
186
187
188
189




190
191
192
193
194
195

249
250
251
252
253
254
255
256
257
258
259
260





261







+
+
+
+

-
-
-
-
-
+

# initializing and reading in config file
config = Config()

config.section('reactor')
if config['reactor'] == 'thread':
	checker = CheckerThread()
elif config['reactor'] == 'plain':
	checker = Checker()
elif config['reactor'] == 'kqueue':
	checker = CheckerKqueue()

while True:
	line = sys.stdin.readline()
	if len(line) == 0:
		break
	checker.check(line)
checker.loop()