Lines of
src/main.rs
from check-in f4cad2a5c0
that are changed by the sequence of edits moving toward
check-in 37a0139d49:
1: use anyhow::{
2: anyhow,
3: Result,
4: };
5: use async_std::task;
6: use samotop::{
7: mail::{
8: Builder,
9: DebugService,
10: MailDir,
11: Name
12: },
13: smtp::{
14: SmtpParser,
15: Prudence,
16: },
17: };
18: use teloxide::{
19: Bot,
20: prelude::{
21: Requester,
22: RequesterExt,
23: },
24: types::{
25: ChatId,
26: ParseMode::MarkdownV2,
27: },
28: };
29:
30: use std::{
31: borrow::Cow,
32: collections::{
33: HashMap,
34: HashSet,
35: },
36: io::Read,
37: os::unix::fs::{
38: FileTypeExt,
39: PermissionsExt,
40: },
41: path::{
42: Path,
43: PathBuf
44: },
45: time::Duration,
46: vec::Vec,
47: };
48:
49: fn address_into_iter<'a>(addr: &'a mail_parser::Address<'a, >) -> impl Iterator<Item = Cow<'a, str>> {
50: addr.clone().into_list().into_iter().map(|a| a.address.unwrap())
51: }
52:
53: async fn relay_mails(maildir: &Path, core: &TelegramTransport) -> Result<()> {
54: let new_dir = maildir.join("new");
55:
56: std::fs::create_dir_all(&new_dir)?;
57:
58: let files = std::fs::read_dir(new_dir)?;
59: for file in files {
60: let file = file?;
61: let mut buf: String = Default::default();
62: std::fs::File::open(file.path())?.read_to_string(&mut buf)?;
63:
64: let mail = mail_parser::MessageParser::new().parse(&buf)
65: .ok_or(anyhow!("Failed to parse mail `{:?}`.", file))?;
66:
67: // Fetching address lists from fields we know
68: let mut to = HashSet::new();
69: if let Some(addr) = mail.to() {
70: let _ = address_into_iter(addr).map(|x| to.insert(x));
71: };
72: if let Some(addr) = mail.header("X-Samotop-To") {
73: match addr {
74: mail_parser::HeaderValue::Address(addr) => {
75: let _ = address_into_iter(addr).map(|x| to.insert(x));
76: },
77: mail_parser::HeaderValue::Text(text) => {
78: to.insert(text.clone());
79: },
80: _ => {}
81: }
82: };
83:
84: // Adding all known addresses to recipient list, for anyone else adding default
85: // Also if list is empty also adding default
86: let mut rcpt: HashSet<&ChatId> = HashSet::new();
87: for item in to {
88: let item = item.into_owned();
89: match core.recipients.get(&item) {
90: Some(addr) => rcpt.insert(addr),
91: None => {
92: core.debug(format!("Recipient [{}] not found.", &item)).await?;
93: rcpt.insert(core.recipients.get("_")
94: .ok_or(anyhow!("Missing default address in recipient table."))?)
95: }
96: };
97: };
98: if rcpt.is_empty() {
99: core.debug("No recipient or envelope address.").await?;
100: rcpt.insert(core.recipients.get("_")
101: .ok_or(anyhow!("Missing default address in recipient table."))?);
102: };
103:
104: // prepating message header
105: let mut reply: Vec<Cow<'_, str>> = vec![];
106: if let Some(subject) = mail.subject() {
107: reply.push(format!("**Subject:** `{}`", subject).into());
108: } else if let Some(thread) = mail.thread_name() {
109: reply.push(format!("**Thread:** `{}`", thread).into());
110: }
111: if let Some(from) = mail.from() {
112: reply.push(format!("**From:** `{:?}`", address_into_iter(from).collect::<Vec<_>>().join(", ")).into());
113: }
114: if let Some(sender) = mail.sender() {
115: reply.push(format!("**Sender:** `{:?}`", address_into_iter(sender).collect::<Vec<_>>().join(", ")).into());
116: }
117: reply.push("".into());
118: let header_size = reply.join("\n").len() + 1;
119:
120: let html_parts = mail.html_body_count();
121: let text_parts = mail.text_body_count();
122: let attachments = mail.attachment_count();
123: if html_parts != text_parts {
124: core.debug(format!("Hm, we have {} HTML parts and {} text parts.", html_parts, text_parts)).await?;
125: }
126: //let mut html_num = 0;
127: let mut text_num = 0;
128: let mut file_num = 0;
129: // let's display first html or text part as body
130: let mut body = "".into();
131: /*
132: * actually I don't wanna parse that html stuff
133: if html_parts > 0 {
134: let text = mail.body_html(0).unwrap();
135: if text.len() < 4096 - header_size {
136: body = text;
137: html_num = 1;
138: }
139: };
140: */
141: if body == "" && text_parts > 0 {
142: let text = mail.body_text(0)
143: .ok_or(anyhow!("Failed to extract text from message."))?;
144: if text.len() < 4096 - header_size {
145: body = text;
146: text_num = 1;
147: }
148: };
149: reply.push("```".into());
150: for line in body.lines() {
151: reply.push(line.into());
152: }
153: reply.push("```".into());
154:
155: // and let's collect all other attachment parts
156: let mut files_to_send = vec![];
157: /*
158: * let's just skip html parts for now, they just duplicate text?
159: while html_num < html_parts {
160: files_to_send.push(mail.html_part(html_num).unwrap());
161: html_num += 1;
162: }
163: */
164: while text_num < text_parts {
165: files_to_send.push(mail.text_part(text_num)
166: .ok_or(anyhow!("Failed to get text part from message"))?);
167: text_num += 1;
168: }
169: while file_num < attachments {
170: files_to_send.push(mail.attachment(file_num)
171: .ok_or(anyhow!("Failed to get file part from message"))?);
172: file_num += 1;
173: }
174:
175: let msg = reply.join("\n");
176: for chat in rcpt {
177: if !files_to_send.is_empty() {
178: let mut files = vec![];
179: let mut first_one = true;
180: for chunk in &files_to_send {
181: let data = chunk.contents();
182: let mut filename: Option<String> = None;
183: for header in chunk.headers() {
184: if header.name() == "Content-Type" {
185: match header.value() {
186: mail_parser::HeaderValue::ContentType(contenttype) => {
187: if let Some(fname) = contenttype.attribute("name") {
188: filename = Some(fname.to_owned());
189: }
190: },
191: _ => {
192: core.debug("Attachment has bad ContentType header.").await?;
193: },
194: };
195: };
196: };
197: let filename = if let Some(fname) = filename {
198: fname
199: } else {
200: "Attachment.txt".into()
201: };
202: let item = teloxide::types::InputMediaDocument::new(
203: teloxide::types::InputFile::memory(data.to_vec())
204: .file_name(filename));
205: let item = if first_one {
206: first_one = false;
207: item.caption(&msg).parse_mode(MarkdownV2)
208: } else {
209: item
210: };
211: files.push(teloxide::types::InputMedia::Document(item));
212: }
213: core.sendgroup(chat, files).await?;
214: } else {
215: core.send(chat, &msg).await?;
216: }
217: }
218:
219: std::fs::remove_file(file.path())?;
220: }
221: Ok(())
222: }
223:
224: fn my_prudence() -> Prudence {
225: Prudence::default().with_read_timeout(Duration::from_secs(60)).with_banner_delay(Duration::from_secs(1))
226: }
227:
228: pub struct TelegramTransport {
229: tg: teloxide::adaptors::DefaultParseMode<Bot>,
230: recipients: HashMap<String, ChatId>,
231: }
232:
233: impl TelegramTransport {
234: pub fn new(settings: config::Config) -> TelegramTransport {
235: let tg = Bot::new(settings.get_string("api_key")
236: .expect("[smtp2tg.toml] missing \"api_key\" parameter.\n"))
237: .parse_mode(MarkdownV2);
238: let recipients: HashMap<String, ChatId> = settings.get_table("recipients")
239: .expect("[smtp2tg.toml] missing table \"recipients\".\n")
240: .into_iter().map(|(a, b)| (a, ChatId (b.into_int()
241: .expect("[smtp2tg.toml] \"recipient\" table values should be integers.\n")
242: ))).collect();
243: if !recipients.contains_key("_") {
244: eprintln!("[smtp2tg.toml] \"recipient\" table misses \"default_recipient\".\n");
245: panic!("no default recipient");
246: }
247:
248: TelegramTransport {
249: tg,
250: recipients,
251: }
252: }
253:
254: pub async fn debug<'b, S>(&self, msg: S) -> Result<teloxide::types::Message>
255: where S: Into<String> {
256: task::sleep(Duration::from_secs(5)).await;
257: Ok(self.tg.send_message(*self.recipients.get("_").unwrap(), msg).await?)
258: }
259:
260: pub async fn send<'b, S>(&self, to: &ChatId, msg: S) -> Result<teloxide::types::Message>
261: where S: Into<String> {
262: task::sleep(Duration::from_secs(5)).await;
263: Ok(self.tg.send_message(*to, msg).await?)
264: }
265:
266: pub async fn sendgroup<M>(&self, to: &ChatId, media: M) -> Result<Vec<teloxide::types::Message>>
267: where M: IntoIterator<Item = teloxide::types::InputMedia> {
268: task::sleep(Duration::from_secs(5)).await;
269: Ok(self.tg.send_media_group(*to, media).await?)
270: }
271: }
272:
273: #[async_std::main]
274: async fn main() {
275: let settings: config::Config = config::Config::builder()
276: .add_source(config::File::with_name("smtp2tg.toml"))
277: .build()
278: .expect("[smtp2tg.toml] there was an error reading config\n\
279: \tplease consult \"smtp2tg.toml.example\" for details");
280:
281: let maildir: PathBuf = settings.get_string("maildir")
282: .expect("[smtp2tg.toml] missing \"maildir\" parameter.\n").into();
283: let listen_on = settings.get_string("listen_on")
284: .expect("[smtp2tg.toml] missing \"listen_on\" parameter.\n");
285: let core = TelegramTransport::new(settings);
286: let sink = Builder + Name::new("smtp2tg") + DebugService +
287: my_prudence() + MailDir::new(maildir.clone()).unwrap();
288:
289: env_logger::init();
290:
291: task::spawn(async move {
292: loop {
293: // relay mails
294: if let Err(err) = relay_mails(&maildir, &core).await {
295: // in case that fails - inform default recipient
296: if let Err(err) = core.debug(format!("Sending emails failed:\n{:?}", err)).await {
297: // in case that also fails - write some logs and bail
298: eprintln!("Failed to contact Telegram:\n{:?}", err);
f4cad2a5c0 2024-05-26 299: task::sleep(Duration::from_secs(5 * 60)).await;
300: };
301: };
302: task::sleep(Duration::from_secs(5)).await;
303: }
304: });
305:
306: match listen_on.as_str() {
307: "socket" => {
308: let socket_path = "./smtp2tg.sock";
309: match std::fs::symlink_metadata(socket_path) {
310: Ok(metadata) => {
311: if metadata.file_type().is_socket() {
312: std::fs::remove_file(socket_path)
313: .expect("[smtp2tg] failed to remove old socket.\n");
314: } else {
315: eprintln!("[smtp2tg] \"./smtp2tg.sock\" we wanted to use is actually not a socket.\n\
316: [smtp2tg] please check the file and remove it manually.\n");
317: panic!("socket path unavailable");
318: }
319: },
320: Err(err) => {
321: match err.kind() {
322: std::io::ErrorKind::NotFound => {},
323: _ => {
324: eprintln!("{:?}", err);
325: panic!("unhandled file type error");
326: }
327: };
328: }
329: };
330:
331: let sink = sink + samotop::smtp::Lmtp.with(SmtpParser);
332: task::spawn(async move {
333: // Postpone mode change on the socket. I can't actually change
334: // other way, as UnixServer just grabs path, and blocks
335: task::sleep(Duration::from_secs(1)).await;
336: std::fs::set_permissions(socket_path, std::fs::Permissions::from_mode(0o777)).unwrap();
337: });
338: samotop::server::UnixServer::on(socket_path)
339: .serve(sink.build()).await.unwrap();
340: },
341: _ => {
342: let sink = sink + samotop::smtp::Esmtp.with(SmtpParser);
343: samotop::server::TcpServer::on(listen_on)
344: .serve(sink.build()).await.unwrap();
345: },
346: };
347: }