Lines of
src/sql.rs
from check-in 44575a91d3
that are changed by the sequence of edits moving toward
check-in acb0a4ac54:
1: use std::{
2: borrow::Cow,
3: fmt,
4: };
5:
6: use async_std::sync::{
7: Arc,
8: Mutex,
9: };
10: use chrono::{
11: DateTime,
12: FixedOffset,
13: Local,
14: };
15: use sqlx::{
16: Postgres,
17: Row,
18: postgres::PgPoolOptions,
19: pool::PoolConnection,
20: };
21: use stacked_errors::{
22: Result,
23: StackableErr,
24: bail,
25: };
26:
27: #[derive(sqlx::FromRow, Debug)]
28: pub struct List {
29: pub source_id: i32,
30: pub channel: String,
31: pub enabled: bool,
32: pub url: String,
33: pub iv_hash: Option<String>,
34: pub url_re: Option<String>,
35: }
36:
37: impl fmt::Display for List {
38: fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::result::Result<(), fmt::Error> {
39: write!(f, "#{} \\*ļøā£ `{}` {}\nš `{}`", self.source_id, self.channel,
40: match self.enabled {
41: true => "š enabled",
42: false => "ā disabled",
43: }, self.url)?;
44: if let Some(iv_hash) = &self.iv_hash {
45: write!(f, "\nIV: `{iv_hash}`")?;
46: }
47: if let Some(url_re) = &self.url_re {
48: write!(f, "\nRE: `{url_re}`")?;
49: }
50: Ok(())
51: }
52: }
53:
54: #[derive(sqlx::FromRow, Debug)]
55: pub struct Source {
56: pub channel_id: i64,
57: pub url: String,
58: pub iv_hash: Option<String>,
59: pub owner: i64,
60: pub url_re: Option<String>,
61: }
62:
63: #[derive(sqlx::FromRow)]
64: pub struct Queue {
65: pub source_id: Option<i32>,
66: pub next_fetch: Option<DateTime<Local>>,
67: pub owner: Option<i64>,
68: }
69:
70: #[derive(Clone)]
71: pub struct Db (
72: Arc<Mutex<sqlx::Pool<sqlx::Postgres>>>,
73: );
74:
75: impl Db {
76: pub fn new (pguri: &str) -> Result<Db> {
77: Ok(Db (
78: Arc::new(Mutex::new(PgPoolOptions::new()
79: .max_connections(5)
80: .acquire_timeout(std::time::Duration::new(300, 0))
81: .idle_timeout(std::time::Duration::new(60, 0))
82: .connect_lazy(pguri)
83: .stack()?)),
84: ))
85: }
86:
87: pub async fn begin(&self) -> Result<Conn> {
88: let pool = self.0.lock_arc().await;
89: let conn = Conn ( pool.acquire().await.stack()? );
90: Ok(conn)
91: }
92: }
93:
94: pub struct Conn (
95: PoolConnection<Postgres>,
96: );
97:
98: impl Conn {
99: pub async fn add_post (&mut self, source_id: i32, date: &DateTime<FixedOffset>, post_url: &str) -> Result<()> {
100: sqlx::query("insert into rsstg_post (source_id, posted, url) values ($1, $2, $3);")
101: .bind(source_id)
102: .bind(date)
103: .bind(post_url)
104: .execute(&mut *self.0).await.stack()?;
105: Ok(())
106: }
107:
108: pub async fn clean <I> (&mut self, source_id: i32, owner: I) -> Result<Cow<'_, str>>
109: where I: Into<i64> {
110: match sqlx::query("delete from rsstg_post p using rsstg_source s where p.source_id = $1 and owner = $2 and p.source_id = s.source_id;")
111: .bind(source_id)
112: .bind(owner.into())
113: .execute(&mut *self.0).await.stack()?.rows_affected() {
114: 0 => { Ok("No data found found.".into()) },
115: x => { Ok(format!("{x} posts purged.").into()) },
116: }
117: }
118:
119: pub async fn delete <I> (&mut self, source_id: i32, owner: I) -> Result<Cow<'_, str>>
120: where I: Into<i64> {
121: match sqlx::query("delete from rsstg_source where source_id = $1 and owner = $2;")
122: .bind(source_id)
123: .bind(owner.into())
124: .execute(&mut *self.0).await.stack()?.rows_affected() {
125: 0 => { Ok("No data found found.".into()) },
126: x => { Ok(format!("{x} sources removed.").into()) },
127: }
128: }
129:
130: pub async fn disable <I> (&mut self, source_id: i32, owner: I) -> Result<&str>
131: where I: Into<i64> {
132: match sqlx::query("update rsstg_source set enabled = false where source_id = $1 and owner = $2")
133: .bind(source_id)
134: .bind(owner.into())
135: .execute(&mut *self.0).await.stack()?.rows_affected() {
136: 1 => { Ok("Source disabled.") },
137: 0 => { Ok("Source not found.") },
138: _ => { bail!("Database error.") },
139: }
140: }
141:
142: pub async fn enable <I> (&mut self, source_id: i32, owner: I) -> Result<&str>
143: where I: Into<i64> {
144: match sqlx::query("update rsstg_source set enabled = true where source_id = $1 and owner = $2")
145: .bind(source_id)
146: .bind(owner.into())
147: .execute(&mut *self.0).await.stack()?.rows_affected() {
148: 1 => { Ok("Source enabled.") },
149: 0 => { Ok("Source not found.") },
150: _ => { bail!("Database error.") },
151: }
152: }
153:
154: pub async fn exists <I> (&mut self, post_url: &str, id: I) -> Result<Option<bool>>
155: where I: Into<i64> {
156: let row = sqlx::query("select exists(select true from rsstg_post where url = $1 and source_id = $2) as exists;")
157: .bind(post_url)
158: .bind(id.into())
159: .fetch_one(&mut *self.0).await.stack()?;
160: let exists: Option<bool> = row.try_get("exists").stack()?;
161: Ok(exists)
162: }
163:
164: pub async fn get_queue (&mut self) -> Result<Vec<Queue>> {
44575a91d3 2025-07-09 165: let block: Vec<Queue> = sqlx::query_as("select source_id, next_fetch, owner from rsstg_order natural left join rsstg_source where next_fetch < now() + interval '1 minute';")
166: .fetch_all(&mut *self.0).await.stack()?;
167: Ok(block)
168: }
169:
170: pub async fn get_list <I> (&mut self, owner: I) -> Result<Vec<List>>
171: where I: Into<i64> {
172: let source: Vec<List> = sqlx::query_as("select source_id, channel, enabled, url, iv_hash, url_re from rsstg_source where owner = $1 order by source_id")
173: .bind(owner.into())
174: .fetch_all(&mut *self.0).await.stack()?;
175: Ok(source)
176: }
177:
178: pub async fn get_one <I> (&mut self, owner: I, id: i32) -> Result<Option<List>>
179: where I: Into<i64> {
180: let source: Option<List> = sqlx::query_as("select source_id, channel, enabled, url, iv_hash, url_re from rsstg_source where owner = $1 and source_id = $2")
181: .bind(owner.into())
182: .bind(id)
183: .fetch_optional(&mut *self.0).await.stack()?;
184: Ok(source)
185: }
186:
187: pub async fn get_source <I> (&mut self, id: i32, owner: I) -> Result<Source>
188: where I: Into<i64> {
189: let source: Source = sqlx::query_as("select channel_id, url, iv_hash, owner, url_re from rsstg_source where source_id = $1 and owner = $2")
190: .bind(id)
191: .bind(owner.into())
192: .fetch_one(&mut *self.0).await.stack()?;
193: Ok(source)
194: }
195:
196: pub async fn set_scrape <I> (&mut self, id: I) -> Result<()>
197: where I: Into<i64> {
198: sqlx::query("update rsstg_source set last_scrape = now() where source_id = $1;")
199: .bind(id.into())
200: .execute(&mut *self.0).await.stack()?;
201: Ok(())
202: }
203:
204: pub async fn update <I> (&mut self, update: Option<i32>, channel: &str, channel_id: i64, url: &str, iv_hash: Option<&str>, url_re: Option<&str>, owner: I) -> Result<&str>
205: where I: Into<i64> {
206: match match update {
207: Some(id) => {
208: sqlx::query("update rsstg_source set channel_id = $2, url = $3, iv_hash = $4, owner = $5, channel = $6, url_re = $7 where source_id = $1")
209: .bind(id)
210: },
211: None => {
212: sqlx::query("insert into rsstg_source (channel_id, url, iv_hash, owner, channel, url_re) values ($1, $2, $3, $4, $5, $6)")
213: },
214: }
215: .bind(channel_id)
216: .bind(url)
217: .bind(iv_hash)
218: .bind(owner.into())
219: .bind(channel)
220: .bind(url_re)
221: .execute(&mut *self.0).await
222: {
223: Ok(_) => Ok(match update {
224: Some(_) => "Channel updated.",
225: None => "Channel added.",
226: }),
227: Err(sqlx::Error::Database(err)) => {
228: match err.downcast::<sqlx::postgres::PgDatabaseError>().routine() {
229: Some("_bt_check_unique", ) => {
230: Ok("Duplicate key.")
231: },
232: Some(_) => {
233: Ok("Database error.")
234: },
235: None => {
236: Ok("No database error extracted.")
237: },
238: }
239: },
240: Err(err) => {
241: bail!("Sorry, unknown error:\n{err:#?}\n");
242: },
243: }
244: }
245: }