NNTP to IMAP duplicator

nntpdup.py at tip
anonymous

nntpdup.py at tip

File nntpdup.py from the latest check-in


#!/usr/bin/env python3

import configparser, email.utils, getpass, imaplib, nntplib, re, sqlite3, sys
imaplib._MAXLINE = 1024 * 1024 * 4
nntplib._MAXLINE = 1024 * 1024 * 4

config = configparser.ConfigParser(allow_no_value = True)
config.read('nntpdup.conf')

try:
	#server = nntplib.NNTP_SSL(config['connection']['newsserver'])
	server = nntplib.NNTP(config['connection']['newsserver'])
except nntplib.NNTPTemporaryError as err:
	if err.response.startswith('400 load at '):
		print(err.response)
		exit(0)
	else:
		raise(err)
mserver = imaplib.IMAP4_SSL(config['connection']['mailserver'])
reMessageId = re.compile('(<[-\][a-zA-Z0-9@.%/=_\$+!&~#\?}]+>)"?\)\)(\d+ \(FLAGS\(\)\))?$')
mserver.login(config['connection']['mail_user'], config['connection']['mail_password'])
if 'mail_limit' in config['connection']:
	mailLimit = int(config['connection']['mail_limit'])
else:
	mailLimit = 100
if 'header_limit' in config['connection']:
	headerLimit = int(config['connection']['header_limit'])
else:
	headerLimit = 1000

tables = {
	'list': ["create table list (id integer primary key, name text, last integer default 0);"],
	'ids': ["create table ids (id integer, name text, mask integer, date integer);", "create unique index ids__id_name on ids(id, name);"],
}

class Folder:
	def __init__(this, filename):
		this.db = sqlite3.connect(filename)
		this.id = None
		found = set()
		for row in this.db.execute("select name from sqlite_master where type = 'table';"):
			found.add(row[0])
		for absent in set(tables.keys()).difference(found):
			for query in tables[absent]:
				this.db.execute(query)

	def select(this, folderName):
		this.name = folderName
		this.id = None
		while True:
			present = False
			for row in this.db.execute("select id, last from list where name = ?;", [folderName]):
				present = True
				this.id = row[0]
				this.last = row[1]
			if present:
				break
			this.db.execute("insert into list(name) values (?);", [folderName])
		if this.id == None:
			print('Id not found.')
			exit(1)
		this.mask = {}
		this.get_count()

	def get_count(this):
		this.count = 0
		for row in this.db.execute("select count(*) from ids where id = ? and mask in (3, 1);", [this.id]):
			this.count = row[0]

	def get_record_count(this, mask):
		for row in this.db.execute("select count(*) from ids where id = ? and mask = ?;", [this.id, mask]):
			return(row[0])

	def check(this, name):
		if name in this.mask:
			return(this.mask[name])
		for row in this.db.execute("select mask from ids where id = ? and name = ?;", [this.id, name]):
			this.mask[name] = row[0]
			return(row[0])

	def addlast(this, count):
		this.last += count
		this.db.execute("update list set last = ? where id = ?;", [this.last, this.id])

	def droplast(this):
		this.last = 0
		this.db.execute("update list set last = ? where id = ?;", [this.last, this.id])

	def addmail(this, mid):
		mask = this.check(mid)
		if mask in (3, 2):
			this.db.execute("update ids set mask = 3 where id = ? and name = ?;", [this.id, mid])
			this.mask[mid] = 3
		else:
			this.db.execute("insert into ids(id, name, mask) values(?, ?, ?);", [this.id, mid, 1])
			this.count += 1
			this.mask[mid] = 1

	def addnews(this, mid, date = None):
		mask = this.check(mid)
		if mask in (1, 3):
			this.db.execute("update ids set mask = 3, date = ? where id = ? and name = ?;", [date, this.id, mid])
			this.mask[mid] = 3
		else:
			this.db.execute("insert into ids(id, name, mask, date) values(?, ?, ?, ?);", [this.id, mid, 2, date])
			this.count += 1
			this.mask[mid] = 2

	def zeromail(this):
		this.mask = {}
		this.db.execute("update ids set mask = 2 where id = ? and mask = 3;", [this.id])
		this.db.execute("delete from ids where id = ? and mask = 1;", [this.id])
		this.sync()
		this.get_count()

	def zeronews(this):
		this.mask = {}
		this.db.execute("update ids set mask = 1 where id = ? and mask = 3;", [this.id])
		this.db.execute("delete from ids where id = ? and mask = 2;", [this.id])
		this.droplast()
		this.sync()

	def sync(this):
		this.db.commit()

	def get_unfetched(this):
		return(this.db.execute("select name, date from ids where id = ? and mask = 2 order by date desc;", [this.id]))

	def forget(this, mid):
		this.db.execute("delete from ids where id = ? and name = ?;", [this.id, mid])

def check_folder(mserver, folder, folderName):
	folder.zeromail()
	deleted = 0
	mserver.select(folderName)
	typ, data = mserver.search(None, 'NOT DELETED')
	count = 0
	print(' - building imap index', folderName, '[', end='')
	for num in data[0].split():
		found = False
		typ, data = mserver.fetch(num, '(ENVELOPE)')
		field = 0
		for rec in data:
			if type(rec) == tuple:
				data[field] = ''.join(i.decode('utf-8', 'ignore') for i in rec)
			else:
				data[field] = rec.decode('utf-8', 'ignore')
			field += 1
		data = ''.join(data)
		isMid = reMessageId.search(data)
		if isMid:
			mid = isMid.group(1)
			mask = folder.check(mid)
			if not mask in (1, 3):
				folder.addmail(mid)
				count += 1
			else:
				mserver.store(num, '+FLAGS', '\\Deleted')
				deleted += 1
				sys.stdout.write('x')
				sys.stdout.flush()
		else:
			print('Message id not found.')
			print(repr(data))
			exit(1)
		if (count % 1000) == 0:
			sys.stdout.write('.')
			sys.stdout.flush()
	print('], deleted:', deleted)
	folder.sync()
	mserver.expunge()

folder = Folder('nntpdup.sqlite')

limits = [0, 0]
limitSteps = [headerLimit / len(config['groups']), mailLimit / len(config['groups'])]

maxlength = 0
for folderName in (config['groups'].keys()):
	maxlength = max(maxlength, len(folderName))

skew = 1 + int(maxlength / 8)

for folderName in (set(config['groups'].keys())):
	stats = [0, 0]
	folder.select(folderName)
	localFolderName = folderName

	resp = mserver.select(localFolderName)
	print('#--', localFolderName, ':', resp)
	if resp[0] != 'OK':
		localFolderName = folderName.replace('.', '/')
		resp = mserver.select(localFolderName)
		if resp[0] != 'OK':
			print("Can't open folder.")
			exit(1)
	if int(resp[1][0]) != folder.count:
		check_folder(mserver, folder, localFolderName)

	_, count, first, last, _ = server.group(folderName)
	limits[0] += limitSteps[0]
	if last > folder.last:
		count = 0
		# we need to fetch new ids
		request = min(last, folder.last + limits[0])
		try:
			for record in server.over((int(folder.last) + 1, int(request)))[1]:
				mid = record[1]['message-id']
				if len(record[1]['message-id']) > 0:
					try:
						folder.addnews(record[1]['message-id'], email.utils.parsedate_to_datetime(record[1]['date']).timestamp())
					except OverflowError as err:
						folder.addnews(record[1]['message-id'])
					except TypeError as err:
						folder.addnews(record[1]['message-id'])
				count += 1
		except nntplib.NNTPTemporaryError as err:
			if err.response.startswith('423 '):
				pass
			else:
				raise(err)
		except nntplib.NNTPPermanentError as err:
			print(folder.last, request)
			raise(err)
		except sqlite3.IntegrityError as err:
			print(repr(record))
			print([x for x in map(repr, folder.db.execute("select * from ids where id = ? and name = ?;", [folder.id, record[1]['message-id']]))])
			raise(err)
		stats[0] = count
		limits[0] -= count
		folder.addlast(request - folder.last)
		folder.sync()
	elif folder.get_record_count(1) > 0:
		folder.droplast()

	limits[1] += limitSteps[1]
	if folder.get_record_count(2) > 0:
		count = 0
		# there are extra articles
		raw_date = []
		unfetched = []
		for item, env_date in folder.get_unfetched():
			mask = folder.check(item)
			if mask == 2:
				unfetched += (item, env_date),
		for item, env_date in unfetched:
			try:
				_, info = server.article(item)
				if env_date == None or env_date < 0:
					date = None
					backup_date = None
					out = []
					for line in info.lines:
						if len(line) == 0:
							mesg = email.message_from_string('\n'.join(out))
							for header in mesg._headers:
								if header[0] == 'Date':
									raw_date += header[1],
									date = email.utils.parsedate(header[1])
								elif header[0] == 'Original-Received':
									raw_date += header[1],
									tmp_date = email.utils.parsedate(header[1].split(';')[-1])
									if tmp_date != None and tmp_date[0] >= 1970:
										backup_date = tmp_date
							if date == None and backup_date == None:
								print('Date missed.')
								print(repr(out))
								exit(1)
							elif date == None:
								date = backup_date
							break
						try:
							out.append(line.decode('ascii', 'ignore'))
						except UnicodeDecodeError:
							print(repr(line))
							exit(1)
					out.append('\n')
					try:
						#print('*', item, date, type(date))
						mserver.append(localFolderName, None, date, b'\n'.join(info.lines))
					except AttributeError as err:
						#print('*', item, raw_date, repr(date))
						#raise(err)
						mserver.append(localFolderName, None, backup_date, b'\n'.join(info.lines))
					except OverflowError as err:
						#print('*', item, raw_date, repr(date))
						#raise(err)
						mserver.append(localFolderName, None, backup_date, b'\n'.join(info.lines))
				else:
					#print('*', item, env_date, type(env_date))
					mserver.append(localFolderName, None, env_date, b'\n'.join(info.lines))
				folder.addmail(item)
				folder.sync()
				count += 1
				if count >= limits[1]:
					break
			except nntplib.NNTPTemporaryError as err:
				if err.response.startswith('430 No such article'):
					folder.forget(item)
				else:
					print(err.response, item, env_date)
					raise(err)
		stats[1] = count
		limits[1] -= count

	if stats[0] != 0 or stats[1] != 0:
		print('# ', folderName, '\t'*(skew - int((len(folderName) + 2) / 8)), '\t'.join(map(str, stats)), sep = '')
	folder.sync()