Lines of
src/sql.rs
from check-in b4af85e31b
that are changed by the sequence of edits moving toward
check-in c6d3e97290:
1: use std::{
2: borrow::Cow,
3: fmt,
4: };
5:
6: use anyhow::{
7: Result,
8: bail,
9: };
10: use async_std::sync::{
11: Arc,
12: Mutex,
13: };
14: use chrono::{
15: DateTime,
16: FixedOffset,
17: Local,
18: };
19: use sqlx::{
20: Postgres,
21: Row,
22: postgres::PgPoolOptions,
23: pool::PoolConnection,
24: };
25:
26: #[derive(sqlx::FromRow, Debug)]
27: pub struct List {
28: pub source_id: i32,
29: pub channel: String,
30: pub enabled: bool,
31: pub url: String,
32: pub iv_hash: Option<String>,
33: pub url_re: Option<String>,
34: }
35:
36: impl fmt::Display for List {
37: fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
b4af85e31b 2025-06-29 38: write!(f, "\\#ļøā£ {} \\*ļøā£ `{}` {}\nš `{}`", self.source_id, self.channel,
39: match self.enabled {
40: true => "š enabled",
41: false => "ā disabled",
42: }, self.url)?;
43: if let Some(iv_hash) = &self.iv_hash {
44: write!(f, "\nIV: `{iv_hash}`")?;
45: }
46: if let Some(url_re) = &self.url_re {
47: write!(f, "\nRE: `{url_re}`")?;
48: }
49: Ok(())
50: }
51: }
52:
53: #[derive(sqlx::FromRow, Debug)]
54: pub struct Source {
55: pub channel_id: i64,
56: pub url: String,
57: pub iv_hash: Option<String>,
58: pub owner: i64,
59: pub url_re: Option<String>,
60: }
61:
62: #[derive(sqlx::FromRow)]
63: pub struct Queue {
64: pub source_id: Option<i32>,
65: pub next_fetch: Option<DateTime<Local>>,
66: pub owner: Option<i64>,
67: }
68:
69: #[derive(Clone)]
b4af85e31b 2025-06-29 70: pub struct Db {
b4af85e31b 2025-06-29 71: pool: Arc<Mutex<sqlx::Pool<sqlx::Postgres>>>,
b4af85e31b 2025-06-29 72: }
b4af85e31b 2025-06-29 73:
b4af85e31b 2025-06-29 74: pub struct Conn{
b4af85e31b 2025-06-29 75: conn: PoolConnection<Postgres>,
b4af85e31b 2025-06-29 76: }
77:
78: impl Db {
79: pub fn new (pguri: &str) -> Result<Db> {
b4af85e31b 2025-06-29 80: Ok(Db{
b4af85e31b 2025-06-29 81: pool: Arc::new(Mutex::new(PgPoolOptions::new()
82: .max_connections(5)
83: .acquire_timeout(std::time::Duration::new(300, 0))
84: .idle_timeout(std::time::Duration::new(60, 0))
85: .connect_lazy(pguri)?)),
b4af85e31b 2025-06-29 86: })
87: }
88:
89: pub async fn begin(&self) -> Result<Conn> {
b4af85e31b 2025-06-29 90: let pool = self.pool.lock_arc().await;
b4af85e31b 2025-06-29 91: let conn = Conn::new(pool.acquire().await?).await?;
92: Ok(conn)
93: }
94: }
95:
96: impl Conn {
b4af85e31b 2025-06-29 97: pub async fn new (conn: PoolConnection<Postgres>) -> Result<Conn> {
b4af85e31b 2025-06-29 98: Ok(Conn{
b4af85e31b 2025-06-29 99: conn,
b4af85e31b 2025-06-29 100: })
b4af85e31b 2025-06-29 101: }
b4af85e31b 2025-06-29 102:
103: pub async fn add_post (&mut self, source_id: i32, date: &DateTime<FixedOffset>, post_url: &str) -> Result<()> {
104: sqlx::query("insert into rsstg_post (source_id, posted, url) values ($1, $2, $3);")
105: .bind(source_id)
106: .bind(date)
107: .bind(post_url)
b4af85e31b 2025-06-29 108: .execute(&mut *self.conn).await?;
109: Ok(())
110: }
111:
112: pub async fn clean <I> (&mut self, source_id: i32, owner: I) -> Result<Cow<'_, str>>
113: where I: Into<i64> {
114: match sqlx::query("delete from rsstg_post p using rsstg_source s where p.source_id = $1 and owner = $2 and p.source_id = s.source_id;")
115: .bind(source_id)
116: .bind(owner.into())
b4af85e31b 2025-06-29 117: .execute(&mut *self.conn).await?.rows_affected() {
118: 0 => { Ok("No data found found.".into()) },
119: x => { Ok(format!("{x} posts purged.").into()) },
120: }
121: }
122:
123: pub async fn delete <I> (&mut self, source_id: i32, owner: I) -> Result<Cow<'_, str>>
124: where I: Into<i64> {
125: match sqlx::query("delete from rsstg_source where source_id = $1 and owner = $2;")
126: .bind(source_id)
127: .bind(owner.into())
b4af85e31b 2025-06-29 128: .execute(&mut *self.conn).await?.rows_affected() {
129: 0 => { Ok("No data found found.".into()) },
130: x => { Ok(format!("{} sources removed.", x).into()) },
131: }
132: }
133:
134: pub async fn disable <I> (&mut self, source_id: i32, owner: I) -> Result<&str>
135: where I: Into<i64> {
136: match sqlx::query("update rsstg_source set enabled = false where source_id = $1 and owner = $2")
137: .bind(source_id)
138: .bind(owner.into())
b4af85e31b 2025-06-29 139: .execute(&mut *self.conn).await?.rows_affected() {
140: 1 => { Ok("Source disabled.") },
141: 0 => { Ok("Source not found.") },
142: _ => { bail!("Database error.") },
143: }
144: }
145:
146: pub async fn enable <I> (&mut self, source_id: i32, owner: I) -> Result<&str>
147: where I: Into<i64> {
148: match sqlx::query("update rsstg_source set enabled = true where source_id = $1 and owner = $2")
149: .bind(source_id)
150: .bind(owner.into())
b4af85e31b 2025-06-29 151: .execute(&mut *self.conn).await?.rows_affected() {
152: 1 => { Ok("Source enabled.") },
153: 0 => { Ok("Source not found.") },
154: _ => { bail!("Database error.") },
155: }
156: }
157:
158: pub async fn exists <I> (&mut self, post_url: &str, id: I) -> Result<Option<bool>>
159: where I: Into<i64> {
160: let row = sqlx::query("select exists(select true from rsstg_post where url = $1 and source_id = $2) as exists;")
161: .bind(post_url)
162: .bind(id.into())
b4af85e31b 2025-06-29 163: .fetch_one(&mut *self.conn).await?;
164: let exists: Option<bool> = row.try_get("exists")?;
165: Ok(exists)
166: }
167:
168: pub async fn get_queue (&mut self) -> Result<Vec<Queue>> {
169: let block: Vec<Queue> = sqlx::query_as("select source_id, next_fetch, owner from rsstg_order natural left join rsstg_source where next_fetch < now() + interval '1 minute';")
b4af85e31b 2025-06-29 170: .fetch_all(&mut *self.conn).await?;
171: Ok(block)
172: }
173:
174: pub async fn get_list <I> (&mut self, owner: I) -> Result<Vec<List>>
175: where I: Into<i64> {
176: let source: Vec<List> = sqlx::query_as("select source_id, channel, enabled, url, iv_hash, url_re from rsstg_source where owner = $1 order by source_id")
177: .bind(owner.into())
b4af85e31b 2025-06-29 178: .fetch_all(&mut *self.conn).await?;
179: Ok(source)
180: }
181:
182: pub async fn get_one <I> (&mut self, owner: I, id: i32) -> Result<Option<List>>
183: where I: Into<i64> {
184: let source: Option<List> = sqlx::query_as("select source_id, channel, enabled, url, iv_hash, url_re from rsstg_source where owner = $1 and source_id = $2")
185: .bind(owner.into())
186: .bind(id)
b4af85e31b 2025-06-29 187: .fetch_optional(&mut *self.conn).await?;
188: Ok(source)
189: }
190:
191: pub async fn get_source <I> (&mut self, id: i32, owner: I) -> Result<Source>
192: where I: Into<i64> {
193: let source: Source = sqlx::query_as("select channel_id, url, iv_hash, owner, url_re from rsstg_source where source_id = $1 and owner = $2")
194: .bind(id)
195: .bind(owner.into())
b4af85e31b 2025-06-29 196: .fetch_one(&mut *self.conn).await?;
197: Ok(source)
198: }
199:
200: pub async fn set_scrape <I> (&mut self, id: I) -> Result<()>
201: where I: Into<i64> {
202: sqlx::query("update rsstg_source set last_scrape = now() where source_id = $1;")
203: .bind(id.into())
b4af85e31b 2025-06-29 204: .execute(&mut *self.conn).await?;
205: Ok(())
206: }
207:
208: pub async fn update <I> (&mut self, update: Option<i32>, channel: &str, channel_id: i64, url: &str, iv_hash: Option<&str>, url_re: Option<&str>, owner: I) -> Result<&str>
209: where I: Into<i64> {
210: match match update {
211: Some(id) => {
212: sqlx::query("update rsstg_source set channel_id = $2, url = $3, iv_hash = $4, owner = $5, channel = $6, url_re = $7 where source_id = $1")
213: .bind(id)
214: },
215: None => {
216: sqlx::query("insert into rsstg_source (channel_id, url, iv_hash, owner, channel, url_re) values ($1, $2, $3, $4, $5, $6)")
217: },
218: }
219: .bind(channel_id)
220: .bind(url)
221: .bind(iv_hash)
222: .bind(owner.into())
223: .bind(channel)
224: .bind(url_re)
b4af85e31b 2025-06-29 225: .execute(&mut *self.conn).await
226: {
227: Ok(_) => Ok(match update {
228: Some(_) => "Channel updated.",
229: None => "Channel added.",
230: }),
231: Err(sqlx::Error::Database(err)) => {
232: match err.downcast::<sqlx::postgres::PgDatabaseError>().routine() {
233: Some("_bt_check_unique", ) => {
234: Ok("Duplicate key.")
235: },
236: Some(_) => {
237: Ok("Database error.")
238: },
239: None => {
240: Ok("No database error extracted.")
241: },
242: }
243: },
244: Err(err) => {
245: bail!("Sorry, unknown error:\n{err:#?}\n");
246: },
247: }
248: }
249: }