Lines of
offload/src/main.rs
from check-in b33171b487
that are changed by the sequence of edits moving toward
check-in f721596292:
1: extern crate config;
2: extern crate postgres;
3: extern crate redis;
4: extern crate regex;
5:
6: use postgres::{Connection, TlsMode};
7: use redis::Commands;
8: use regex::Regex;
9: use std::collections::HashMap;
10: use std::time::Duration;
11: use std::thread;
12:
13: fn main() {
14: loop {
15: let mut settings = config::Config::default();
16: settings.merge(config::File::with_name("offload")).expect("Can't read configuration file");
17:
18: let reddb = redis::Client::open("redis://127.0.0.1/").expect("Can't connect to the database");
19: let red = reddb.get_connection().expect("Can't initialize new connection");
20:
b33171b487 2018-06-01 21: let re = Regex::new("^([0-9A-Z]+)_([a-z./0-9-]+)_([0-9.]+)$").expect("Can't parse regexp");
22: let conn = Connection::connect(settings.get_str("pg").expect("Postgres connection absent in config"), TlsMode::None).expect("Can't connect to postgres");
23:
24: let schemas = settings.get_array("schemas").expect("Schema list not found in config").into_iter().map(|value| config::Value::into_str(value).expect("We require string here"));
25: for schema in schemas {
26: let data_key = schema.to_owned() + "_counter_pending";
27: let cache_key = schema.to_owned() + "_counter_pending_now";
28:
29: if red.exists(&cache_key).expect("Can't query cache existance") {
30: } else if red.exists(&data_key).expect("Can't query data existance") {
31: let _ : bool = red.rename_nx(&data_key, &cache_key).expect("Can't readd unflushed hash");
32: } else {
33: continue;
34: }
35:
36: let stats : HashMap<String, i16> = red.hgetall(&cache_key).expect("Can't query for stored stat keys");
37:
38: //println!("# {:?}", stats);
39:
40: let trans = conn.transaction().expect("Can't start transaction");
41: let stmt = trans.prepare(&format!("select {}.merge_counter($1::text, $2::text, ($3::text)::inet, $4::smallint);", &schema)).expect("Can't prepare statement");
42: for (client, count) in stats {
43: //println!("# {:?}: {:?}", &client, &count);
b33171b487 2018-06-01 44: let cap = re.captures(&client).expect("Client match failed");
45: //let addr = IpAddr::V4(cap[3].parse().expect("Can't parse IP"));
46: //println!("insert into x values({:?}, {:?}, {:?}, {:?});", &cap[1], &cap[2], &cap[3], &count);
47: stmt.execute(&[&cap[1].to_string(), &cap[2].to_string(), &cap[3].to_string(), &count]).expect("Can't execute prepared");
48: //trans.query(&format!("select {}.merge_counter($1::text, $2::text, $3::inet, $4::smallint);", &schema), &[&cap[1].to_string(), &cap[2].to_string(), &cap[3].to_string(), &count]).expect("Can't prepare statement");
49: }
50: trans.commit().expect("Can't commit transaction");
51: red.del(cache_key).expect("Can't remove stale key")
52: }
53: thread::sleep(Duration::new(settings.get("delay").expect("Delay specification absent in config"), 0));
54: }
55: }