Index: samesite.py ================================================================== --- samesite.py +++ samesite.py @@ -1,160 +1,576 @@ #!/usr/bin/env python3.1 import datetime, http.cookiejar, optparse, os, sys, shelve, re, urllib.request + +from spacemap import SpaceMap parser = optparse.OptionParser() parser.add_option('-v', '--verbose', action = 'store_true', dest = 'verbose', help = 'turns on verbose status notifications', metavar = 'bool', default = False) parser.add_option('-d', '--dir', action = 'store', dest = 'dir', help = 'specify directory where the files should be stored', metavar = 'string', default = None) parser.add_option('-r', '--root', action = 'store', dest = 'root', help = 'specify a site from which data should be mirrored', metavar = 'string', default = None) parser.add_option('-l', '--log', action = 'store', dest = 'log', help = 'specify a log file to process', metavar = 'string', default = None) parser.add_option('-e', '--skip-etag', action = 'store_true', dest = 'noetag', help = 'do not process etags', metavar = 'bool', default = False) +parser.add_option('-p', '--port', action = 'store', dest = 'port', help = 'listen on this port for incoming connections', metavar = 'integer', default = None) +parser.add_option('-n', '--no-update', action = 'store_true', dest = 'noupdate', help = 'do not update already downloaded files', metavar = 'bool', default = 'False') (options, args) = parser.parse_args() assert options.dir, 'Directory not specified' assert options.root, 'Server not specified' -assert options.log, 'Log file not specified' -assert os.access(options.log, os.R_OK), 'Log file unreadable' +assert options.log or options.port, 'Log file or port not specified' +assert options.port or os.access(options.log, os.R_OK), 'Log file unreadable' + +optionsDirWithSep = re.compile('^(.*?)/?$').match(options.dir) +if optionsDirWithSep: + options.dir = optionsDirWithSep.group(1) # this is file index - everything is stored in this file -index = shelve.open(options.dir + '/.index') +# _parts - list of stored parts of file +# _time - last time the file was checked +# everything else is just the headers +index = shelve.open(options.dir + os.sep + '.index') desc_fields = ('Content-Length', 'Pragma', 'Last-Modified') ignore_fields = ('Accept-Ranges', 'Age', 'Cache-Control', 'Connection', 'Content-Type', 'Date', 'Expires', 'Server', 'Via', 'X-Cache', 'X-Cache-Lookup', 'X-Powered-By') if not options.noetag: desc_fields += 'ETag', else: ignore_fields += 'ETag', -block_size = 32768 - -while True: - unchecked_files = set() - checked_files = 0 - - # reading log and storing found urls for processing - # check file mtime XXX - with open(options.log, 'r') as log_file: - log_line = re.compile('^[^ ]+ - - \[.*] "(GET|HEAD) (.*?)(\?.*)? HTTP/1.1" (\d+) \d+ "(.*)" "(.*)"$') - for line in log_file: - this_line = log_line.match(line.strip()) - if this_line: - unchecked_files.add(this_line.group(2)) - - for url in unchecked_files: - reload = False - recheck = False - info = 'Checking file: ' + url - - # creating empty placeholder in index - if not url in index: - info += '\nThis one is new.' - index[url] = {} - reload = True - - # creating file name from url - file_name = options.dir + re.compile('%20').sub(' ', url) - - # forcibly checking file if no file present - if not reload and not os.access(file_name, os.R_OK): - info += '\nFile not found or inaccessible.' - reload = True - - # forcibly checking file if file size doesn't match with index data - elif not reload and 'Content-Length' in index[url] and os.stat(file_name).st_size != int(index[url]['Content-Length']): - info += '\nFile size is ' + os.stat(file_name).st_size + ' and stored file size is ' + index[url]['Content-Length'] + '.' - reload = True - - # forcibly checking file if index hods Pragma header - if not reload and 'Pragma' in index[url] and index[url]['Pragma'] == 'no-cache': - info +='\nPragma on: recheck imminent.' - recheck = True - - if options.verbose: - print(info) - - # skipping file processing if there's no need to recheck it and we have checked it at least 4 hours ago - if not recheck and not reload and '__time__' in index[url] and (datetime.datetime.now() - datetime.timedelta(hours = 4) - index[url]['__time__']).days < 0: - continue - - try: - with urllib.request.urlopen(options.root + url) as source: - new_headers = {} - headers = source.info() - if not options.verbose: - print(info) - - # stripping unneeded headers (XXX make this inplace?) - for header in headers: - if header in desc_fields: - if header == 'Pragma' and headers[header] != 'no-cache': - print('Pragma:', headers[header]) - new_headers[header] = headers[header] - elif not header in ignore_fields: - print('Undefined header "', header, '": ', headers[header], sep='') - - # comparing headers with data found in index - # if any header has changed (except Pragma) file is fully downloaded - # same if we get more or less headers - old_keys = set(index[url].keys()) - old_keys.discard('__time__') - old_keys.discard('Pragma') - more_keys = set(new_headers.keys()) - old_keys - more_keys.discard('Pragma') - less_keys = old_keys - set(new_headers.keys()) - if len(more_keys) > 0: - if not len(old_keys) == 0: - print('More headers appear:', more_keys) - reload = True - elif len(less_keys) > 0: - print('Less headers appear:', less_keys) - else: - for key in index[url].keys(): - if key not in ('__time__', 'Pragma') and not index[url][key] == new_headers[key]: - print('Header "', key, '" changed from [', index[url][key], '] to [', new_headers[key], ']', sep='') - reload = True - - # downloading file - if reload: - if 'Content-Length' in headers: - print('Downloading', headers['Content-Length'], 'bytes [', end='') - else: - print('Downloading [', end='') - sys.stdout.flush() - - # file is created at temporary location and moved in place only when download completes - temp_file = open(options.dir + '/.tmp', 'wb') - buffer = source.read(block_size) - blocks = 0 - megs = 0 - while len(buffer) > 0: - temp_file.write(buffer) - print('.', end='') - sys.stdout.flush() - buffer = source.read(block_size) - blocks += 1 - if blocks > 1024*1024/block_size: - blocks = blocks - 1024*1024/block_size - megs += 1 - print('{}Mb'.format(megs), end='') - temp_file.close() - print(']') - os.renames(options.dir + '/.tmp', file_name) - - checked_files += 1 - - # storing new time mark and storing new headers - new_headers['__time__'] = datetime.datetime.now() - index[url] = new_headers - index.sync() - - except urllib.error.HTTPError as error: - # in case of error we don't need to do anything actually, - # if file download stalls or fails the file would not be moved to it's location - print(error) - - if options.verbose: - print('[', len(unchecked_files), '/', checked_files, ']') - - # checking if there were any files downloaded, if yes - restarting sequence - if checked_files == 0: - break +block_size = 4096 + +temp_file_name = options.dir + os.sep + '.tmp' + +''' +# later, kqueue would be good but later +class Connection: + __slots__ = frozenset(('__address', '__input', '__socket', '__status', 'error', 'method', 'url', 'http_version')) + + def __init__(self, socket, address): + self.__address = address + self.__input = b'' + self.__socket = socket + self.__status = 0 + + def read(self, kev): + buffer = self.__socket.recv(kev.data) + exhausted = False + if len(buffer) == 0: + eof = True + else: + self.__input += buffer + while not exhausted: + if self.__status == -1: + exhausted = True + elif self.__status == 0: + endstring = self.__input.find(b'\n') + if endstring > 0: + print('Processing request line.') + line = self.__input[:endstring].decode('ascii') + self.__input = self.__input[endstring + 1:] + isRequest = re.compile('(GET) ([^ ]+) HTTP/(1\.0)').match(line) + if not isRequest: + self.error = 'Not a HTTP connection.' + self.__status = -1 + else: + self.method = isRequest.group(1) + self.url = isRequest.group(2) + self.http_version = isRequest.group(3) + self.__status = 1 + else: + exhausted = True + elif self.__status == 1: + endstring = self.__input.find(b'\n') + if endstring > 0: + print('Processing header line.' + repr(self.__input)) + line = self.__input[:endstring].decode('ascii') + self.__input = self.__input[endstring + 1:] + isHeader = re.compile('([^:]*): +(.*)').match(line) + if not isHeader: + self.error = 'Bad header.' + return(False) + # process header here + elif endstring == 0: + self.__status = 2 + else: + exhausted = True + + def write(self, kev): + pass + +if options.port: + import select, socket + + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + try: + sock.bind(('127.0.0.1', int(options.port))) + sock.listen(-1) + + kq = select.kqueue() + assert kq.fileno() != -1, "Fatal error: can't initialise kqueue." + + kq.control([select.kevent(sock, select.KQ_FILTER_READ, select.KQ_EV_ADD)], 0) + timeout = None + + connections = {sock.fileno(): None} + + while True: + kevs = kq.control(None, 1, timeout) + + for kev in kevs: + if type(connections[kev.ident]) == Connection: + print(kev.ident, kev.data, kev.filter, kev.flags) + assert kev.data != 0, 'No data available.' + if kev.filter == select.KQ_FILTER_READ: + connections[kev.ident].read(kev) + elif kev.filter == select.KQ_FILTER_WRITE: + connections[kev.ident].write(kev) + else: + assert kev.filter in (select.KQ_FILTER_READ, select.KQ_FILTER_WRITE), 'Do we support other filters?' + else: + (conn, addr) = sock.accept() + print('Connection from ' + repr(addr)) + kq.control([select.kevent(conn, select.KQ_FILTER_READ, select.KQ_EV_ADD)], 0) + connections[conn.fileno()] = Connection(conn, addr) + + if kev.flags >> 15 == 1: + kq.control([select.kevent(kev.ident, select.KQ_FILTER_READ, select.KQ_EV_DELETE)], 0) + kq.control([select.kevent(kev.ident, select.KQ_FILTER_WRITE, select.KQ_EV_DELETE)], 0) + del(connections[kev.ident]) + finally: + sock.close() +''' + +if options.port: + import http.server + + class MyRequestHandler(http.server.BaseHTTPRequestHandler): + def __process(self): + # reload means file needs to be reloaded to serve request + reload = False + # recheck means file needs to be checked, this also means that if file hav been modified we can serve older copy + recheck = False + # file_stat means file definitely exists + file_stat = None + # requested_ranges holds data about any range requested + requested_ranges = None + # records holds data from index locally, should be written back upon successfull completion + record = None + info = 'Checking file: ' + self.path + + proxy_ignored = ('Accept', 'Accept-Encoding', + 'Cache-Control', 'Connection', + 'Host', + 'User-Agent', + 'Via', + 'X-Forwarded-For', + ) + + print('Command:', self.command) + + for header in self.headers: + if header in proxy_ignored: + pass + elif header in ('Range'): + isRange = re.compile('bytes=(\d+)-(\d+)').match(self.headers[header]) + if isRange: + requested_ranges = SpaceMap({int(isRange.group(1)): int(isRange.group(2)) + 1}) + else: + return() + else: + print('Unknown header - ', header, ': ', self.headers[header], sep='') + return() + print(header, self.headers[header]) + print(self.path) + + # creating empty placeholder in index + # if there's no space map and there's no file in real directory - we have no file + # if there's an empty space map - file is full + # space map generally covers every bit of file we don't posess currently + if not self.path in index: + info += '\nThis one is new.' + reload = True + record = {'_parts': None} + else: + record = index[self.path] + if '_parts' in index[self.path]: + print(record['_parts']) + if index[self.path]['_parts'] == {0: -1}: + index[self.path]['_parts'] = None + + # creating file name from self.path + file_name = options.dir + os.sep + re.compile('%20').sub(' ', self.path) + # partial file or unfinished download + temp_name = options.dir + os.sep + '.parts' + re.compile('%20').sub(' ', self.path) + + # forcibly checking file if no file present + if os.access(file_name, os.R_OK): + file_stat = os.stat(file_name) + elif '_parts' in record and os.access(temp_name, os.R_OK): + file_stat = os.stat(temp_name) + elif not reload: + info += '\nFile not found or inaccessible.' + reload = True + + # forcibly checking file if file size doesn't match with index data + if not reload: + if '_parts' in record and record['_parts'] == SpaceMap(): + if 'Content-Length' in record and file_stat and file_stat.st_size != int(record['Content-Length']): + info += '\nFile size is {} and stored file size is {}.'.format(file_stat.st_size, record['Content-Length']) + reload = True + + # forcibly checking file if index holds Pragma header + if not reload and 'Pragma' in record and record['Pragma'] == 'no-cache': + info +='\nPragma on: recheck imminent.' + recheck = True + + # skipping file processing if there's no need to recheck it and we have checked it at least 4 hours ago + if not recheck and not reload and '_time' in record and (datetime.datetime.now() - datetime.timedelta(hours = 4) - record['_time']).days < 0: + recheck = True + + print(info) + if reload or recheck: + + try: + request = options.root + self.path + if requested_ranges != None: + if '_parts' in record and record['_parts'] != None: + needed = record['_parts'] & requested_ranges + else: + needed = requested_ranges + ranges = () + print('Requesting ranges:', ranges) + print('Not stored ranges:', record['_parts']) + print('Requested ranges:', requested_ranges) + print('Needed ranges:', needed) + needed.rewind() + while True: + range = needed.pop() + if range[0] == None: + break + ranges += '{}-{}'.format(range[0], range[1] - 1), + request = urllib.request.Request(request, headers = {'Range': 'bytes=' + ','.join(ranges)}) + + with urllib.request.urlopen(request) as source: + new_record = {} + new_record['_parts'] = record['_parts'] + headers = source.info() + + # stripping unneeded headers (XXX make this inplace?) + for header in headers: + if header in desc_fields: + #if header == 'Pragma' and headers[header] != 'no-cache': + print(header, headers[header]) + if header == 'Content-Length': + if 'Content-Range' not in headers: + new_record[header] = headers[header] + else: + new_record[header] = headers[header] + elif header == 'Content-Range': + range = re.compile('^bytes (\d+)-(\d+)/(\d+)$').match(headers[header]) + if range: + new_record['Content-Length'] = range.group(3) + else: + assert False, 'Content-Range unrecognized.' + elif not header in ignore_fields: + print('Undefined header "', header, '": ', headers[header], sep='') + + if new_record['_parts'] == None: + new_record['_parts'] = SpaceMap({0: int(new_record['Content-Length'])}) + print(new_record) + + # comparing headers with data found in index + # if any header has changed (except Pragma) file is fully downloaded + # same if we get more or less headers + old_keys = set(record.keys()) + old_keys.discard('_time') + old_keys.discard('Pragma') + more_keys = set(new_record.keys()) - old_keys + more_keys.discard('Pragma') + less_keys = old_keys - set(new_record.keys()) + if len(more_keys) > 0: + if not len(old_keys) == 0: + print('More headers appear:', more_keys) + reload = True + elif len(less_keys) > 0: + print('Less headers appear:', less_keys) + else: + for key in record.keys(): + if key[0] != '_' and key != 'Pragma' and not record[key] == new_record[key]: + print('Header "', key, '" changed from [', record[key], '] to [', new_record[key], ']', sep='') + reload = True + + if reload: + print('Reloading.') + if os.access(temp_name, os.R_OK): + os.unlink(temp_name) + if os.access(file_name, os.R_OK): + os.unlink(file_name) + + # downloading file or segment + if 'Content-Length' in new_record: + if requested_ranges == None: + requested_ranges = new_record['_parts'] + else: + if len(requested_ranges) > 1: + print("Multipart requests currently not supported.") + assert False, 'Skip this one for now.' + else: + assert False, 'No Content-Length or Content-Range header.' + + if reload: + new_record['_time'] = datetime.datetime.now() + if self.command not in ('HEAD'): + # file is created at temporary location and moved in place only when download completes + if not os.access(temp_name, os.R_OK): + empty_name = options.dir + os.sep + '.tmp' + with open(empty_name, 'w+b') as some_file: + pass + os.renames(empty_name, temp_name) + temp_file = open(temp_name, 'r+b') + requested_ranges.rewind() + while True: + (start, end) = requested_ranges.pop() + if start == None: + break + stream_last = start + old_record = new_record + if end - start < block_size: + req_block_size = end - start + else: + req_block_size = block_size + buffer = source.read(req_block_size) + print(buffer) + length = len(buffer) + while length > 0 and stream_last < end: + stream_pos = stream_last + length + assert not stream_pos > end, 'Received more data then requested: pos:{} start:{} end:{}.'.format(stream_pos, start, end) + print('Writing', length, 'bytes to temp file at position', stream_last) + temp_file.seek(stream_last) + temp_file.write(buffer) + new_record['_parts'] = new_record['_parts'] - SpaceMap({stream_last: stream_pos}) + print(new_record) + index[self.path] = old_record + index.sync() + old_record = new_record + stream_last = stream_pos + if end - stream_last < block_size: + req_block_size = end - stream_last + buffer = source.read(req_block_size) + print(buffer) + length = len(buffer) + print(new_record) + index[self.path] = new_record + index.sync() + temp_file.close() + + # moving downloaded data to real file + if new_record['_parts'] == SpaceMap(): + if type(request) != str: + # just moving + # drop old dirs XXX + print('Moving temporary file to new destination.') + os.renames(temp_name, file_name) + + except urllib.error.HTTPError as error: + # in case of error we don't need to do anything actually, + # if file download stalls or fails the file would not be moved to it's location + print(error) + + if self.command == 'HEAD': + self.send_response(200) + if 'Content-Length' in index[self.path]: + self.send_header('Content-Length', index[self.path]['Content-Length']) + self.send_header('Accept-Ranges', 'bytes') + self.send_header('Content-Type', 'application/octet-stream') + if 'Last-Modified' in index[self.path]: + self.send_header('Last-Modified', index[self.path]['Last-Modified']) + self.end_headers() + else: + if index[self.path]['_parts'] != SpaceMap(): + file_name = temp_name + + with open(file_name, 'rb') as real_file: + file_stat = os.stat(file_name) + self.send_response(200) + self.send_header('Last-Modified', index[self.path]['Last-Modified']) + if requested_ranges != None: + ranges = () + requested_ranges.rewind() + while True: + pair = requested_ranges.pop() + if pair[0] == None: + break + ranges += '{}-{}'.format(pair[0], str(pair[1] - 1)), + self.send_header('Content-Range', 'bytes ' + ','.join(ranges) + '/' + index[self.path]['Content-Length']) + else: + self.send_header('Content-Length', str(file_stat.st_size)) + requested_ranges = SpaceMap({0: file_stat.st_size}) + self.send_header('Content-Type', 'application/octet-stream') + self.end_headers() + if self.command in ('GET'): + requested_ranges.rewind() + (start, end) = requested_ranges.pop() + print('Seeking file to position', start) + real_file.seek(start) + if block_size > end - start: + req_block_size = end - start + else: + req_block_size = block_size + print('block_size is', req_block_size) + buffer = real_file.read(req_block_size) + length = len(buffer) + while length > 0: + self.wfile.write(buffer) + start += len(buffer) + if req_block_size > end - start: + req_block_size = end - start + if req_block_size == 0: + break + print('block_size is', req_block_size) + buffer = real_file.read(req_block_size) + length = len(buffer) + + def do_HEAD(self): + return self.__process() + def do_GET(self): + return self.__process() + + server = http.server.HTTPServer(('127.0.0.1', int(options.port)), MyRequestHandler) + server.serve_forever() + +else: + while True: + unchecked_files = set() + checked_files = 0 + + # reading log and storing found urls for processing + # check file mtime XXX + with open(options.log, 'r') as log_file: + log_line = re.compile('^[^ ]+ - - \[.*] "(GET|HEAD) (.*?)(\?.*)? HTTP/1.1" (\d+) \d+ "(.*)" "(.*)"$') + for line in log_file: + this_line = log_line.match(line.strip()) + if this_line: + unchecked_files.add(this_line.group(2)) + + for url in unchecked_files: + reload = False + recheck = False + info = 'Checking file: ' + url + + # creating empty placeholder in index + if not url in index: + info += '\nThis one is new.' + index[url] = {} + reload = True + + # creating file name from url + file_name = options.dir + re.compile('%20').sub(' ', url) + + # forcibly checking file if no file present + if not reload and not os.access(file_name, os.R_OK): + info += '\nFile not found or inaccessible.' + reload = True + + # forcibly checking file if file size doesn't match with index data + elif not reload and 'Content-Length' in index[url] and os.stat(file_name).st_size != int(index[url]['Content-Length']): + info += '\nFile size is ' + os.stat(file_name).st_size + ' and stored file size is ' + index[url]['Content-Length'] + '.' + reload = True + + # forcibly checking file if index hods Pragma header + if not reload and 'Pragma' in index[url] and index[url]['Pragma'] == 'no-cache': + info +='\nPragma on: recheck imminent.' + recheck = True + + # skipping file processing if there's no need to recheck it and we have checked it at least 4 hours ago + if not recheck and not reload and (options.noupdate or ('_time' in index[url] and (datetime.datetime.now() - datetime.timedelta(hours = 4) - index[url]['_time']).days < 0)): + if options.verbose: + print(info) + continue + else: + print(info) + + try: + with urllib.request.urlopen(options.root + url) as source: + new_headers = {} + headers = source.info() + + # stripping unneeded headers (XXX make this inplace?) + for header in headers: + if header in desc_fields: + if header == 'Pragma' and headers[header] != 'no-cache': + print('Pragma:', headers[header]) + new_headers[header] = headers[header] + elif not header in ignore_fields: + print('Undefined header "', header, '": ', headers[header], sep='') + + # comparing headers with data found in index + # if any header has changed (except Pragma) file is fully downloaded + # same if we get more or less headers + old_keys = set(index[url].keys()) + old_keys.discard('_time') + old_keys.discard('Pragma') + more_keys = set(new_headers.keys()) - old_keys + more_keys.discard('Pragma') + less_keys = old_keys - set(new_headers.keys()) + if len(more_keys) > 0: + if not len(old_keys) == 0: + print('More headers appear:', more_keys) + reload = True + elif len(less_keys) > 0: + print('Less headers appear:', less_keys) + else: + for key in index[url].keys(): + if key[0] != '_' and key != 'Pragma' and not index[url][key] == new_headers[key]: + print('Header "', key, '" changed from [', index[url][key], '] to [', new_headers[key], ']', sep='') + reload = True + + # downloading file + if reload: + if 'Content-Length' in headers: + print('Downloading', headers['Content-Length'], 'bytes [', end='') + else: + print('Downloading [', end='') + sys.stdout.flush() + + # file is created at temporary location and moved in place only when download completes + temp_file = open(options.dir + os.sep + '.tmp', 'wb') + buffer = source.read(block_size) + megablocks = 0 + blocks = 0 + megs = 0 + while len(buffer) > 0: + temp_file.write(buffer) + buffer = source.read(block_size) + blocks += 1 + if blocks > 102400/block_size: + megablocks += 1 + if megablocks > 10: + megablocks = megablocks - 10 + megs += 1 + print('{}Mb'.format(megs), end='') + else: + print('.', end='') + blocks = blocks - 102400/block_size + sys.stdout.flush() + temp_file.close() + print(']') + os.renames(options.dir + os.sep + '.tmp', file_name) + + checked_files += 1 + + # storing new time mark and storing new headers + new_headers['_time'] = datetime.datetime.now() + index[url] = new_headers + index.sync() + + except urllib.error.HTTPError as error: + # in case of error we don't need to do anything actually, + # if file download stalls or fails the file would not be moved to it's location + print(error) + + if options.verbose: + print('[', len(unchecked_files), '/', checked_files, ']') + + # checking if there were any files downloaded, if yes - restarting sequence + if checked_files == 0: + break