lua-counter example

Check-in [b33171b487]
anonymous

Check-in [b33171b487]

Overview
Comment:update script, multiple schemas, rust offloader
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | master | trunk
Files: files | file ages | folders
SHA3-256: b33171b487379c32e85109920e2b1d72952b150200608c9da5d78e3296e12f50
User & Date: arcade@b1t.name on 2018-06-01 10:35:23.000
Other Links: branch diff | manifest | tags
Context
2018-10-28
13:32
update regexp to include uppercase letters Leaf check-in: f721596292 user: arcade tags: master, trunk
2018-06-01
10:35
update script, multiple schemas, rust offloader check-in: b33171b487 user: arcade@b1t.name tags: master, trunk
2014-07-03
14:54
initial commit check-in: 38ec946cba user: arcade@b1t.name tags: master, trunk
Changes
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19


20
21
22
1
2
3
4
5
6





7
8
9
10
11
12
13

14
15
16

17






-
-
-
-
-







-
+
+

-

location = /postgres {
	internal;
	postgres_query $echo_request_body;
	postgres_pass counter;
}

location = /sleep {
	internal;
	echo_sleep 1;
}

location = /counter {
	default_type application/json;
	userid on;
	userid_name uid;
	userid_expires 365d;
	userid_p3p 'policyref="/w3c/p3p.xml", CP="NOI CUR ADM PSA OUR STP STA"';

	content_by_lua_file '/path/to/counter.lua';
	content_by_lua_file '/home/arcade/work/admin/nginx/counter.content.lua';
	#log_by_lua_file '/home/arcade/work/admin/nginx/counter.log.lua';
	expires epoch;
	keepalive_timeout 0;
}
Added counter.log.lua version [08105b55ed].
1
2
3
4
5

6
7
8
9


10
11

12
13

14
15
16
17
18
19
20
21








22
23

24

25
26
27
28
29
30

31
32
33
34
35
36
37
38










39
40
41
42
43
44




45
46
47
48
49
50
51
52
53
54
55









56
57
58
59



60
61
62


63
64
65
66
67
68
69
70
71
72
73
74
75




































76
77
78
79
80
81
82
83
84
85
86
87
88













89
90
91
92


93
94
95

96
97
98
99
100
101





102
103


104
105
106
107
108




109
110
111
112
113
114
115






116
117
118
119
120
121
122
123
124

125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
1
2



3
4



5
6
7

8
9

10








11
12
13
14
15
16
17
18

19
20

21

22
23
24
25

26
27
28






29
30
31
32
33
34
35
36
37
38
39





40
41
42
43
44
45
46








47
48
49
50
51
52
53
54
55




56
57
58



59
60













61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97












98
99
100
101
102
103
104
105
106
107
108
109
110
111



112
113
114


115
116





117
118
119
120
121
122

123
124
125




126
127
128
129







130
131
132
133
134
135




136
137



138








































-
-
-
+

-
-
-
+
+

-
+

-
+
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
-

+
-
+
-




-
+


-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+

-
-
-
-
-
+
+
+
+



-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
-
-
-
-
+
+
+
-
-
-
+
+
-
-
-
-
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+

-
-
-
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+

-
-
-
+
+

-
-
+

-
-
-
-
-
+
+
+
+
+

-
+
+

-
-
-
-
+
+
+
+
-
-
-
-
-
-
-
+
+
+
+
+
+
-
-
-
-


-
-
-
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-- module counter - counts some junk

local today = os.date('*t')
local timestamp = os.time{year = today['year'], month = today['month'], day = today['day']}
local referer = '-';
local _counter = {}

if ngx.var.http_referer ~= nil then
	_, _, referer = string.find(ngx.var.http_referer, '^https?://([%w%.]+)')
end
local redis = require 'resty.redis'
local cjson = require 'cjson'

local key = 'stat_counter_' .. timestamp .. '_' .. referer
local closed = false

-- connect to redis
function _counter.exit()
local redis = require 'resty.redis'
local red = redis:new()
local ok, err = red:connect('127.0.0.1', 6379)
if not ok then
	ngx.log(ngx.ERR, 'redis connection failed: ', err)
	ngx.exit(500)
end

	if not closed then
		local ok, err = _counter.red:set_keepalive(60000, 10)
		if not ok then
			ngx.log(ngx.ERR, "redis keepalive: " .. err)
		end
		closed = true
	end
end
local cjson = require 'cjson'

function _counter.decode_bulk(reply)
-- decodes values from array into dict
	-- decodes values from array into dict
function decode_bulk(reply)
	local data = {}
	for j=1, #reply, 2 do
		data[reply[j] ] = reply[j+1]
	end
	return(data)
	return data
end

local data, empty
empty, err = red:hsetnx(key, 'today', 0)
if not empty then
	ngx.log(ngx.ERR, 'redis query failed: ', err)
	ngx.exit(500)
end
function _counter.connect()
	-- connect to redis
	_counter.red = redis:new()
	local ok, err = _counter.red:connect('127.0.0.1', 6379)
	if not ok then
		ngx.log(ngx.ERR, 'redis connection failed: ', err)
		ngx.exit(ngx.HTTP_INTERNAL_SERVER_ERROR)
	end
	closed = false
end

if empty == 0 then
	data, err = red:hgetall(key)
	if not data then
		ngx.log(ngx.ERR, 'redis query failed: ', err)
		ngx.exit(500)
function _counter.check_schema()
	if ngx.var.site_schema == nil then
		ngx.log(ngx.ERR, "No 'site_schema' specified");
		ngx.exit(ngx.HTTP_INTERNAL_SERVER_ERROR)
	end
end

if empty == 1 or data.whole == nil then
	-- postgres fallback
	result = ngx.location.capture('/postgres', {
		method = ngx.HTTP_PUT,
		body = "select * from get_stats('" .. referer .. "') as (today int, lastday int, week bigint, whole bigint);"
	})
	if result.status ~= 200 or not result.body then
		ngx.log(ngx.ERR, 'postgres access failed')
function _counter.update()
	_counter.check_schema()
	local today = os.date('*t')
	local timestamp = os.time{year = today['year'], month = today['month'], day = today['day']}
	local referer = '-';

	if ngx.var.http_referer ~= nil then
		_, _, referer = string.find(ngx.var.http_referer, '^https?://([^?&]+)')
		--ngx.log(ngx.ERR, "Referer: " .. referer)
		ngx.exit(500)
	else
		local unrds = require "rds.parser"
		local res, err = unrds.parse(result.body)
	end

	local key = ngx.var.site_schema .. '_counter_' .. timestamp .. '_' .. referer
		if res == nil then
			ngx.log(ngx.ERR, 'failed to obtain data: ' .. err)
			ngx.exit(500)
	-- ngx.log(ngx.ERR, "Using key: " .. key)

		else
			data = res.resultset[1]
			local req = {key}

			for name, value in pairs(data) do
				if value == unrds.null then
					value = 0
				end
				if name ~= 'today' then
					table.insert(req, name)
					table.insert(req, value)
				end
			end
	local res, err
	_counter.red:init_pipeline(4)
	_counter.red:multi()
	_counter.red:hincrby(key, 'today', 1)
	_counter.red:hgetall(key)
	_counter.red:exec()
	res, err = _counter.red:commit_pipeline()
	if not res then
		ngx.log(ngx.ERR, 'redis pipeline failed: ', err)
		ngx.exit(ngx.HTTP_INTERNAL_SERVER_ERROR)
	end

	local data = _counter.red:array_to_hash(res[4][2])
	-- ngx.log(ngx.ERR, "Got data: " .. cjson.encode(data))

	if tonumber(data.today) == 1 then
		ngx.log(ngx.ERR, "Reading postgres")
		-- postgres fallback
		result = ngx.location.capture('/postgres', {
			method = ngx.HTTP_PUT,
			body = "select * from " .. ngx.var.site_schema .. ".get_stats('" .. referer .. "') as (today int, lastday int, week bigint, whole bigint);"
		})
		if result.status ~= 200 or not result.body then
			ngx.log(ngx.ERR, 'postgres access failed')
			ngx.exit(ngx.HTTP_INTERNAL_SERVER_ERROR)
		else
			local unrds = require "rds.parser"
			local res, err = unrds.parse(result.body)

			if res == nil then
				ngx.log(ngx.ERR, 'failed to obtain data: ' .. err)
				ngx.exit(ngx.HTTP_INTERNAL_SERVER_ERROR)
			else
				data = res.resultset[1]
				data.today = data.today + 1
				ngx.log(ngx.ERR, "Got data: " .. cjson.encode(data))

			red:init_pipeline(3)
			red:hmset{req}
			red:expire(key, 129600)
			red:hincrby(key, 'today', data.today)
			res, err = red:commit_pipeline()
			if not res then
				ngx.log(ngx.ERR, 'redis pipeline failed: ', err)
				ngx.exit(500)
			end
		end
	end
end
				_counter.red:init_pipeline(4)
				_counter.red:multi()
				_counter.red:hmset(key, data)
				_counter.red:expire(key, 129600)
				_counter.red:exec()
				res, err = _counter.red:commit_pipeline()
				if not res then
					ngx.log(ngx.ERR, 'redis pipeline failed: ', err)
					ngx.exit(ngx.HTTP_INTERNAL_SERVER_ERROR)
				end
			end
		end
	end

data.today = data.today + 1
ngx.say(cjson.encode(data))
ngx.eof()
	ngx.say(cjson.encode(data))
	ngx.eof()

res, err = red:hincrby(key, 'today', 1)
local uid = ''
	local uid = ''

if ngx.var.uid_got ~= nil and string.find(ngx.var.uid_got, 'uid=') == 1 then
	uid = string.sub(ngx.var.uid_got, 5)
elseif ngx.var.uid_set ~= nil and string.find(ngx.var.uid_set, 'uid=') == 1 then
	uid = string.sub(ngx.var.uid_set, 5)
end
	if ngx.var.uid_got ~= nil and string.find(ngx.var.uid_got, 'uid=') == 1 then
		uid = string.sub(ngx.var.uid_got, 5)
	elseif ngx.var.uid_set ~= nil and string.find(ngx.var.uid_set, 'uid=') == 1 then
		uid = string.sub(ngx.var.uid_set, 5)
	end

local hit_key = uid .. '_' .. referer .. '_' .. ngx.var.remote_addr
	local hit_key = uid .. '_' .. referer .. '_' .. ngx.var.remote_addr
	local key_pending = ngx.var.site_schema .. '_counter_pending'

red:init_pipeline(5)
red:multi()
red:hincrby('stat_counter_pending', hit_key, 1)
red:expire('stat_counter_pending', 60)
	_counter.red:init_pipeline(4)
	_counter.red:multi()
	_counter.red:hincrby(key_pending, hit_key, 1)
	_counter.red:expire(key_pending, 604800)
red:renamenx('stat_counter_pending', 'stat_counter_pending_tmp')
red:exec()
renamed, err = red:commit_pipeline()
if not renamed then
	ngx.log(ngx.ERR, 'redis multi failed: ', err)
	ngx.exit(500)
end
	_counter.red:exec()
	res, err = _counter.red:commit_pipeline()
	if not res then
		ngx.log(ngx.ERR, 'redis transaction failed: ', err)
		ngx.exit(ngx.HTTP_INTERNAL_SERVER_ERROR)
	end

local continue = false
if tonumber(renamed[5][3]) == 1 then
	continue = true
end

while continue do
	ngx.location.capture('/sleep')

return _counter
	local data
	data, err = red:hgetall('stat_counter_pending_tmp')
	if not data then
		ngx.log(ngx.ERR, 'redis request failed: ', err)
		ngx.exit(500)
	end

	for name, value in pairs(decode_bulk(data)) do
		local fields = {}
		name:gsub("[^_]+", function(c) fields[#fields + 1] = c end)
		fields[#fields + 1] = value
		local result = ngx.location.capture('/postgres', {
			method = ngx.HTTP_PUT,
			-- prepare select? XXX
			body = "select merge_counter('" .. fields[1] .. "'::text, '" .. fields[2] .. "'::text, '" .. fields[3] .. "'::inet, '" .. fields[4] .. "'::smallint);"
		})
		if result.status ~= 200 or not result.body then
			ngx.log(ngx.ERR, 'postgres access failed')
			ngx.exit(500)
		end
	end

	red:init_pipeline(5)
	red:multi()
	red:del('stat_counter_pending_tmp')
	red:exists('stat_counter_pending')
	red:renamenx('stat_counter_pending', 'stat_counter_pending_tmp')
	red:exec()
	renamed, err = red:commit_pipeline()
	if not renamed then
		ngx.log(ngx.ERR, 'redis multi failed: ', err)
		ngx.exit(500)
	end

	if tonumber(renamed[5][1] == 0) or tonumber(renamed[5][2]) == 0 then
		continue = false
	end
end
1
2
3
4

5
6
7
8
9
10
11
12

13
14
15
16
17
18
19
20
21
22



23
24
25

26
27
28

29
30
31
32
33
34
35
36
37
38
39
40
41
42
43

44
45
46
47

48
49
50
51
52
53
54
55

56
57
58

59
60
61
62

63
64
65
66
67
68
69
70
71
72
73
74




75
76

77





1
2
3
4
5
6
7
8

9
10
11
12
13
14
15
16



17
18
19
20
21

22
23
24

25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

40
41
42
43

44
45
46
47
48
49
50
51

52
53
54

55
56
57
58

59
60
61
62
63
64
65
66
67




68
69
70
71

72
73

74
-
-
-
-
+







-
+







-
-
-
+
+
+


-
+


-
+














-
+



-
+







-
+


-
+



-
+








-
-
-
-
+
+
+
+
-

+
-
+
SET search_path = auto, pg_catalog;
BEGIN;

CREATE TABLE counter (
CREATE TABLE if not exists counter (
	uid text NOT NULL,
	referer text DEFAULT '-' NOT NULL,
	ip inet NOT NULL,
	day date DEFAULT now() NOT NULL,
	count integer DEFAULT 0 NOT NULL
);

CREATE TABLE counter_sum (
CREATE TABLE if not exists counter_sum (
	referer text DEFAULT '-'::text NOT NULL,
	day date DEFAULT now() NOT NULL,
	count integer DEFAULT 0 NOT NULL
);

CREATE or replace FUNCTION get_stats(target text) RETURNS record LANGUAGE sql AS $_$
select 
	coalesce((select count from counter_sum where day = current_date and referer = $1), 0) as today,
	coalesce((select count from counter_sum where day = (current_date - interval '1 day')::date and referer = $1), 0) as lastday,
	coalesce((select sum(count) from counter_sum
	coalesce((select count from limbo.counter_sum where day = current_date and referer = $1), 0) as today,
	coalesce((select count from limbo.counter_sum where day = (current_date - interval '1 day')::date and referer = $1), 0) as lastday,
	coalesce((select sum(count) from limbo.counter_sum
		where date_trunc('week', current_date) = date_trunc('week', day) and day <> current_date and referer = $1
		group by referer), 0) as week,
	coalesce((select sum(count) from counter_sum where day <> current_date and referer = $1 group by referer), 0) as whole
	coalesce((select sum(count) from limbo.counter_sum where day <> current_date and referer = $1 group by referer), 0) as whole
$_$;

CREATE FUNCTION counter_sum__change_trigger() RETURNS trigger LANGUAGE plpgsql AS $$
CREATE or replace FUNCTION counter_sum__change_trigger() RETURNS trigger LANGUAGE plpgsql AS $$
declare
	delta_count smallint;
begin
	if (TG_OP = 'UPDATE') then
		if (old.referer <> new.referer or old.day <> new.day) then
			raise exception 'Changing referer or day is prohibited';
		end if;
		delta_count = new.count - old.count;
	elsif (TG_OP = 'INSERT') then
		delta_count = new.count;
	elsif (TG_OP = 'DELETE') then
		raise exception 'Deleting counters prohibited';
	end if;
	<<insert_update>> loop
		update counter_sum set count = count + delta_count
		update limbo.counter_sum set count = count + delta_count
			where referer = new.referer and day = new.day;
		exit insert_update when found;
		begin
			insert into counter_sum (referer, day, count) values (new.referer, new.day, delta_count);
			insert into limbo.counter_sum (referer, day, count) values (new.referer, new.day, delta_count);
			exit insert_update;
			exception when unique_violation then
		end;
	end loop insert_update;
	return new;
end; $$;

CREATE FUNCTION merge_counter(merge_uid text, merge_referer text, merge_ip inet, merge_count smallint) RETURNS boolean LANGUAGE plpgsql AS $$
CREATE or replace FUNCTION merge_counter(merge_uid text, merge_referer text, merge_ip inet, merge_count smallint) RETURNS boolean LANGUAGE plpgsql AS $$
BEGIN
	<<insert_update>> LOOP
		UPDATE counter SET count = count + merge_count
		UPDATE limbo.counter SET count = count + merge_count
			WHERE uid = merge_uid and ip = merge_ip and referer = merge_referer and day = current_date;
		exit insert_update when found;
		BEGIN
			INSERT INTO counter (uid, referer, ip, count) VALUES (merge_uid, merge_referer, merge_ip, merge_count);
			INSERT INTO limbo.counter (uid, referer, ip, count) VALUES (merge_uid, merge_referer, merge_ip, merge_count);
			EXIT insert_update;
			EXCEPTION WHEN unique_violation THEN
		END;
	END LOOP insert_update;
	return true;
END;
$$;

CREATE INDEX counter__date ON counter USING btree (day);
CREATE UNIQUE INDEX counter__uid_nreferer_ip_date ON counter (uid, ip, day) WHERE (referer IS NULL);
CREATE UNIQUE INDEX counter__uid_referer_ip_date ON counter (uid, referer, ip, day);
create unique index counter_sum__referer_day ON counter_sum (referer, day);
CREATE INDEX if not exists counter__date ON counter USING btree (day);
CREATE UNIQUE INDEX if not exists counter__uid_nreferer_ip_date ON counter (uid, ip, day) WHERE (referer IS NULL);
CREATE UNIQUE INDEX if not exists counter__uid_referer_ip_date ON counter (uid, referer, ip, day);
create unique index if not exists counter_sum__referer_day ON counter_sum (referer, day);
CREATE TRIGGER counter__change AFTER INSERT OR DELETE OR UPDATE ON counter FOR EACH ROW EXECUTE PROCEDURE counter_sum__change_trigger();

drop trigger if exists counter__change on counter;
end;
CREATE TRIGGER counter__change AFTER INSERT OR DELETE OR UPDATE ON counter FOR EACH ROW EXECUTE PROCEDURE counter_sum__change_trigger();