Lines of
src/main.rs
from check-in b311abee46
that are changed by the sequence of edits moving toward
check-in 866aad57a4:
1: use anyhow::{
2: anyhow,
3: Result,
4: };
5: use async_std::task;
6: use samotop::{
7: mail::{
8: Builder,
9: DebugService,
10: MailDir,
11: Name
12: },
13: smtp::{
14: SmtpParser,
15: Prudence,
16: },
17: };
18: use teloxide::{
19: Bot,
20: prelude::{
21: Requester,
22: RequesterExt,
23: },
24: types::{
25: ChatId,
26: ParseMode::MarkdownV2,
27: },
28: };
29:
30: use std::{
31: borrow::Cow,
32: collections::{
33: HashMap,
34: HashSet,
35: },
36: io::Read,
37: os::unix::fs::{
38: FileTypeExt,
39: PermissionsExt,
40: },
41: path::{
42: Path,
43: PathBuf
44: },
45: time::Duration,
46: vec::Vec,
47: };
48:
49: fn address_into_iter<'a>(addr: &'a mail_parser::Address<'a, >) -> impl Iterator<Item = Cow<'a, str>> {
50: addr.clone().into_list().into_iter().map(|a| a.address.unwrap())
51: }
52:
53: async fn relay_mails(maildir: &Path, core: &TelegramTransport) -> Result<()> {
54: let new_dir = maildir.join("new");
55:
56: std::fs::create_dir_all(&new_dir)?;
57:
58: let files = std::fs::read_dir(new_dir)?;
59: for file in files {
60: let file = file?;
61: let mut buf: String = Default::default();
62: std::fs::File::open(file.path())?.read_to_string(&mut buf)?;
63:
64: let mail = mail_parser::MessageParser::new().parse(&buf)
65: .ok_or(anyhow!("Failed to parse mail `{:?}`.", file))?;
66:
67: // Fetching address lists from fields we know
68: let mut to = HashSet::new();
69: if let Some(addr) = mail.to() {
70: let _ = address_into_iter(addr).map(|x| to.insert(x));
71: };
72: if let Some(addr) = mail.header("X-Samotop-To") {
73: match addr {
74: mail_parser::HeaderValue::Address(addr) => {
75: let _ = address_into_iter(addr).map(|x| to.insert(x));
76: },
77: mail_parser::HeaderValue::Text(text) => {
78: to.insert(text.clone());
79: },
80: _ => {}
81: }
82: };
83:
84: // Adding all known addresses to recipient list, for anyone else adding default
85: // Also if list is empty also adding default
86: let mut rcpt: HashSet<&ChatId> = HashSet::new();
87: for item in to {
88: let item = item.into_owned();
89: match core.recipients.get(&item) {
90: Some(addr) => rcpt.insert(addr),
91: None => {
92: core.debug(format!("Recipient [{}] not found.", &item)).await?;
93: rcpt.insert(core.recipients.get("_")
94: .ok_or(anyhow!("Missing default address in recipient table."))?)
95: }
96: };
97: };
98: if rcpt.is_empty() {
99: core.debug("No recipient or envelope address.").await?;
100: rcpt.insert(core.recipients.get("_")
101: .ok_or(anyhow!("Missing default address in recipient table."))?);
102: };
103:
104: // prepating message header
105: let mut reply: Vec<Cow<'_, str>> = vec![];
106: if let Some(subject) = mail.subject() {
107: reply.push(format!("**Subject:** `{}`", subject).into());
108: } else if let Some(thread) = mail.thread_name() {
109: reply.push(format!("**Thread:** `{}`", thread).into());
110: }
111: if let Some(from) = mail.from() {
112: reply.push(format!("**From:** `{:?}`", address_into_iter(from).collect::<Vec<_>>().join(", ")).into());
113: }
114: if let Some(sender) = mail.sender() {
115: reply.push(format!("**Sender:** `{:?}`", address_into_iter(sender).collect::<Vec<_>>().join(", ")).into());
116: }
117: reply.push("".into());
118: let header_size = reply.join("\n").len() + 1;
119:
120: let html_parts = mail.html_body_count();
121: let text_parts = mail.text_body_count();
122: let attachments = mail.attachment_count();
123: if html_parts != text_parts {
124: core.debug(format!("Hm, we have {} HTML parts and {} text parts.", html_parts, text_parts)).await?;
125: }
126: //let mut html_num = 0;
127: let mut text_num = 0;
128: let mut file_num = 0;
129: // let's display first html or text part as body
130: let mut body = "".into();
131: /*
132: * actually I don't wanna parse that html stuff
133: if html_parts > 0 {
134: let text = mail.body_html(0).unwrap();
135: if text.len() < 4096 - header_size {
136: body = text;
137: html_num = 1;
138: }
139: };
140: */
141: if body == "" && text_parts > 0 {
142: let text = mail.body_text(0)
143: .ok_or(anyhow!("Failed to extract text from message."))?;
144: if text.len() < 4096 - header_size {
145: body = text;
146: text_num = 1;
147: }
148: };
149: reply.push("```".into());
b311abee46 2024-06-13 150: for line in body.lines() {
b311abee46 2024-06-13 151: reply.push(line.into());
b311abee46 2024-06-13 152: }
153: reply.push("```".into());
154:
155: // and let's collect all other attachment parts
156: let mut files_to_send = vec![];
157: /*
158: * let's just skip html parts for now, they just duplicate text?
159: while html_num < html_parts {
160: files_to_send.push(mail.html_part(html_num).unwrap());
161: html_num += 1;
162: }
163: */
164: while text_num < text_parts {
165: files_to_send.push(mail.text_part(text_num)
166: .ok_or(anyhow!("Failed to get text part from message"))?);
167: text_num += 1;
168: }
169: while file_num < attachments {
170: files_to_send.push(mail.attachment(file_num)
171: .ok_or(anyhow!("Failed to get file part from message"))?);
172: file_num += 1;
173: }
174:
175: let msg = reply.join("\n");
176: for chat in rcpt {
177: if !files_to_send.is_empty() {
178: let mut files = vec![];
179: let mut first_one = true;
180: for chunk in &files_to_send {
181: let data = chunk.contents();
182: let mut filename: Option<String> = None;
183: for header in chunk.headers() {
184: if header.name() == "Content-Type" {
185: match header.value() {
186: mail_parser::HeaderValue::ContentType(contenttype) => {
187: if let Some(fname) = contenttype.attribute("name") {
188: filename = Some(fname.to_owned());
189: }
190: },
191: _ => {
192: core.debug("Attachment has bad ContentType header.").await?;
193: },
194: };
195: };
196: };
197: let filename = if let Some(fname) = filename {
198: fname
199: } else {
200: "Attachment.txt".into()
201: };
202: let item = teloxide::types::InputMediaDocument::new(
203: teloxide::types::InputFile::memory(data.to_vec())
204: .file_name(filename));
205: let item = if first_one {
206: first_one = false;
207: item.caption(&msg).parse_mode(MarkdownV2)
208: } else {
209: item
210: };
b311abee46 2024-06-13 211: files.push(teloxide::types::InputMedia::Document(item));
212: }
213: core.sendgroup(chat, files).await?;
214: } else {
215: core.send(chat, &msg).await?;
216: }
217: }
218:
219: std::fs::remove_file(file.path())?;
220: }
221: Ok(())
222: }
223:
224: fn my_prudence() -> Prudence {
225: Prudence::default().with_read_timeout(Duration::from_secs(60)).with_banner_delay(Duration::from_secs(1))
226: }
227:
228: pub struct TelegramTransport {
229: tg: teloxide::adaptors::DefaultParseMode<teloxide::adaptors::Throttle<Bot>>,
230: recipients: HashMap<String, ChatId>,
231: }
232:
233: impl TelegramTransport {
234: pub fn new(settings: config::Config) -> TelegramTransport {
235: let tg = Bot::new(settings.get_string("api_key")
236: .expect("[smtp2tg.toml] missing \"api_key\" parameter.\n"))
237: .throttle(teloxide::adaptors::throttle::Limits::default())
238: .parse_mode(MarkdownV2);
239: let recipients: HashMap<String, ChatId> = settings.get_table("recipients")
240: .expect("[smtp2tg.toml] missing table \"recipients\".\n")
241: .into_iter().map(|(a, b)| (a, ChatId (b.into_int()
242: .expect("[smtp2tg.toml] \"recipient\" table values should be integers.\n")
243: ))).collect();
244: if !recipients.contains_key("_") {
245: eprintln!("[smtp2tg.toml] \"recipient\" table misses \"default_recipient\".\n");
246: panic!("no default recipient");
247: }
248:
249: TelegramTransport {
250: tg,
251: recipients,
252: }
253: }
254:
b311abee46 2024-06-13 255: pub async fn debug<'b, S>(&self, msg: S) -> Result<teloxide::types::Message>
256: where S: Into<String> {
257: Ok(self.tg.send_message(*self.recipients.get("_").unwrap(), msg).await?)
258: }
259:
b311abee46 2024-06-13 260: pub async fn send<'b, S>(&self, to: &ChatId, msg: S) -> Result<teloxide::types::Message>
261: where S: Into<String> {
262: Ok(self.tg.send_message(*to, msg).await?)
263: }
264:
b311abee46 2024-06-13 265: pub async fn sendgroup<M>(&self, to: &ChatId, media: M) -> Result<Vec<teloxide::types::Message>>
b311abee46 2024-06-13 266: where M: IntoIterator<Item = teloxide::types::InputMedia> {
267: Ok(self.tg.send_media_group(*to, media).await?)
268: }
269: }
270:
271: #[async_std::main]
272: async fn main() {
273: let settings: config::Config = config::Config::builder()
274: .add_source(config::File::with_name("smtp2tg.toml"))
275: .build()
276: .expect("[smtp2tg.toml] there was an error reading config\n\
277: \tplease consult \"smtp2tg.toml.example\" for details");
278:
279: let maildir: PathBuf = settings.get_string("maildir")
280: .expect("[smtp2tg.toml] missing \"maildir\" parameter.\n").into();
281: let listen_on = settings.get_string("listen_on")
282: .expect("[smtp2tg.toml] missing \"listen_on\" parameter.\n");
283: let core = TelegramTransport::new(settings);
284: let sink = Builder + Name::new("smtp2tg") + DebugService +
285: my_prudence() + MailDir::new(maildir.clone()).unwrap();
286:
287: task::spawn(async move {
288: loop {
289: // relay mails
290: if let Err(err) = relay_mails(&maildir, &core).await {
291: // in case that fails - inform default recipient
292: if let Err(err) = core.debug(format!("Sending emails failed:\n{:?}", err)).await {
293: // in case that also fails - write some logs and bail
294: eprintln!("Failed to contact Telegram:\n{:?}", err);
295: };
296: task::sleep(Duration::from_secs(5 * 60)).await;
297: };
298: task::sleep(Duration::from_secs(5)).await;
299: }
300: });
301:
302: match listen_on.as_str() {
303: "socket" => {
304: let socket_path = "./smtp2tg.sock";
305: match std::fs::symlink_metadata(socket_path) {
306: Ok(metadata) => {
307: if metadata.file_type().is_socket() {
308: std::fs::remove_file(socket_path)
309: .expect("[smtp2tg] failed to remove old socket.\n");
310: } else {
311: eprintln!("[smtp2tg] \"./smtp2tg.sock\" we wanted to use is actually not a socket.\n\
312: [smtp2tg] please check the file and remove it manually.\n");
313: panic!("socket path unavailable");
314: }
315: },
316: Err(err) => {
317: match err.kind() {
318: std::io::ErrorKind::NotFound => {},
319: _ => {
320: eprintln!("{:?}", err);
321: panic!("unhandled file type error");
322: }
323: };
324: }
325: };
326:
327: let sink = sink + samotop::smtp::Lmtp.with(SmtpParser);
328: task::spawn(async move {
329: // Postpone mode change on the socket. I can't actually change
330: // other way, as UnixServer just grabs path, and blocks
331: task::sleep(Duration::from_secs(1)).await;
332: std::fs::set_permissions(socket_path, std::fs::Permissions::from_mode(0o777)).unwrap();
333: });
334: samotop::server::UnixServer::on(socket_path)
335: .serve(sink.build()).await.unwrap();
336: },
337: _ => {
338: let sink = sink + samotop::smtp::Esmtp.with(SmtpParser);
339: samotop::server::TcpServer::on(listen_on)
340: .serve(sink.build()).await.unwrap();
341: },
342: };
343: }