lua-counter example

Check-in [b33171b487]
anonymous

Check-in [b33171b487]

Overview
Comment:update script, multiple schemas, rust offloader
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | master | trunk
Files: files | file ages | folders
SHA3-256: b33171b487379c32e85109920e2b1d72952b150200608c9da5d78e3296e12f50
User & Date: arcade@b1t.name on 2018-06-01 10:35:23.000
Other Links: branch diff | manifest | tags
Context
2018-10-28
13:32
update regexp to include uppercase letters Leaf check-in: f721596292 user: arcade tags: master, trunk
2018-06-01
10:35
update script, multiple schemas, rust offloader check-in: b33171b487 user: arcade@b1t.name tags: master, trunk
2014-07-03
14:54
initial commit check-in: 38ec946cba user: arcade@b1t.name tags: master, trunk
Changes
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

20
21
22
location = /postgres {
	internal;
	postgres_query $echo_request_body;
	postgres_pass counter;
}

location = /sleep {
	internal;
	echo_sleep 1;
}

location = /counter {
	default_type application/json;
	userid on;
	userid_name uid;
	userid_expires 365d;
	userid_p3p 'policyref="/w3c/p3p.xml", CP="NOI CUR ADM PSA OUR STP STA"';

	content_by_lua_file '/path/to/counter.lua';

	expires epoch;
	keepalive_timeout 0;
}






<
<
<
<
<







|
>

<

1
2
3
4
5
6





7
8
9
10
11
12
13
14
15
16

17
location = /postgres {
	internal;
	postgres_query $echo_request_body;
	postgres_pass counter;
}






location = /counter {
	default_type application/json;
	userid on;
	userid_name uid;
	userid_expires 365d;
	userid_p3p 'policyref="/w3c/p3p.xml", CP="NOI CUR ADM PSA OUR STP STA"';

	content_by_lua_file '/home/arcade/work/admin/nginx/counter.content.lua';
	#log_by_lua_file '/home/arcade/work/admin/nginx/counter.log.lua';
	expires epoch;

}
Added counter.log.lua version [08105b55ed].
1
2
3
4
5
6
7
8
9


10
11
12
13
14
15
16
17
18
19

20
21
22
23

24
25
26
27
28
29
30
31
32
33

34

35
36
37
38


39
40
41

42
43
44
45
46
47
48
49
50
51
52
53
54

55
56
57
58
59
60
61
62
63
64
65




66
67
68
69

70
71


72











73

74



75



76
77

78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103

104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
-- module counter - counts some junk

local today = os.date('*t')
local timestamp = os.time{year = today['year'], month = today['month'], day = today['day']}
local referer = '-';

if ngx.var.http_referer ~= nil then
	_, _, referer = string.find(ngx.var.http_referer, '^https?://([%w%.]+)')
end



local key = 'stat_counter_' .. timestamp .. '_' .. referer

-- connect to redis
local redis = require 'resty.redis'
local red = redis:new()
local ok, err = red:connect('127.0.0.1', 6379)
if not ok then
	ngx.log(ngx.ERR, 'redis connection failed: ', err)
	ngx.exit(500)

end

local cjson = require 'cjson'


-- decodes values from array into dict
function decode_bulk(reply)
	local data = {}
	for j=1, #reply, 2 do
		data[reply[j] ] = reply[j+1]
	end
	return(data)
end

local data, empty

empty, err = red:hsetnx(key, 'today', 0)

if not empty then
	ngx.log(ngx.ERR, 'redis query failed: ', err)
	ngx.exit(500)
end



if empty == 0 then
	data, err = red:hgetall(key)

	if not data then
		ngx.log(ngx.ERR, 'redis query failed: ', err)
		ngx.exit(500)
	end
end

if empty == 1 or data.whole == nil then
	-- postgres fallback
	result = ngx.location.capture('/postgres', {
		method = ngx.HTTP_PUT,
		body = "select * from get_stats('" .. referer .. "') as (today int, lastday int, week bigint, whole bigint);"
	})
	if result.status ~= 200 or not result.body then

		ngx.log(ngx.ERR, 'postgres access failed')
		ngx.exit(500)
	else
		local unrds = require "rds.parser"
		local res, err = unrds.parse(result.body)
		if res == nil then
			ngx.log(ngx.ERR, 'failed to obtain data: ' .. err)
			ngx.exit(500)
		else
			data = res.resultset[1]
			local req = {key}





			for name, value in pairs(data) do
				if value == unrds.null then
					value = 0

				end
				if name ~= 'today' then


					table.insert(req, name)











					table.insert(req, value)

				end



			end




			red:init_pipeline(3)

			red:hmset{req}
			red:expire(key, 129600)
			red:hincrby(key, 'today', data.today)
			res, err = red:commit_pipeline()
			if not res then
				ngx.log(ngx.ERR, 'redis pipeline failed: ', err)
				ngx.exit(500)
			end
		end
	end
end

data.today = data.today + 1
ngx.say(cjson.encode(data))
ngx.eof()

res, err = red:hincrby(key, 'today', 1)
local uid = ''

if ngx.var.uid_got ~= nil and string.find(ngx.var.uid_got, 'uid=') == 1 then
	uid = string.sub(ngx.var.uid_got, 5)
elseif ngx.var.uid_set ~= nil and string.find(ngx.var.uid_set, 'uid=') == 1 then
	uid = string.sub(ngx.var.uid_set, 5)
end

local hit_key = uid .. '_' .. referer .. '_' .. ngx.var.remote_addr


red:init_pipeline(5)
red:multi()
red:hincrby('stat_counter_pending', hit_key, 1)
red:expire('stat_counter_pending', 60)
red:renamenx('stat_counter_pending', 'stat_counter_pending_tmp')
red:exec()
renamed, err = red:commit_pipeline()
if not renamed then
	ngx.log(ngx.ERR, 'redis multi failed: ', err)
	ngx.exit(500)
end

local continue = false
if tonumber(renamed[5][3]) == 1 then
	continue = true
end

while continue do
	ngx.location.capture('/sleep')

	local data
	data, err = red:hgetall('stat_counter_pending_tmp')
	if not data then
		ngx.log(ngx.ERR, 'redis request failed: ', err)
		ngx.exit(500)
	end

	for name, value in pairs(decode_bulk(data)) do
		local fields = {}
		name:gsub("[^_]+", function(c) fields[#fields + 1] = c end)
		fields[#fields + 1] = value
		local result = ngx.location.capture('/postgres', {
			method = ngx.HTTP_PUT,
			-- prepare select? XXX
			body = "select merge_counter('" .. fields[1] .. "'::text, '" .. fields[2] .. "'::text, '" .. fields[3] .. "'::inet, '" .. fields[4] .. "'::smallint);"
		})
		if result.status ~= 200 or not result.body then
			ngx.log(ngx.ERR, 'postgres access failed')
			ngx.exit(500)
		end
	end

	red:init_pipeline(5)
	red:multi()
	red:del('stat_counter_pending_tmp')
	red:exists('stat_counter_pending')
	red:renamenx('stat_counter_pending', 'stat_counter_pending_tmp')
	red:exec()
	renamed, err = red:commit_pipeline()
	if not renamed then
		ngx.log(ngx.ERR, 'redis multi failed: ', err)
		ngx.exit(500)
	end

	if tonumber(renamed[5][1] == 0) or tonumber(renamed[5][2]) == 0 then
		continue = false
	end
end


<
<
|

<
<
<
>
>

|

|
<
|
|
|
|
|
>
|
|
<

>
|
<




|


|
>
|
>
|
|
|
|
>
>

<
<
>
|
|
|



|
|
|
|
|
|
|
>
|
<
|
|
|
<
|
|
<
<
|
>
>
>
>
|
|
|
|
>
|
|
>
>
|
>
>
>
>
>
>
>
>
>
>
>
|
>
|
>
>
>
|
>
>
>

|
>
|
|
|
|
|
|
|
|
|
|
|

<
|
|

<
|

|
|
|
|
|

|
>

|
|
|
|
<
|
|
|
|
|
|
<
<
<
<


<
<
|
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
1
2


3
4



5
6
7
8
9
10

11
12
13
14
15
16
17
18

19
20
21

22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39


40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55

56
57
58

59
60


61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111

112
113
114

115
116
117
118
119
120
121
122
123
124
125
126
127
128
129

130
131
132
133
134
135




136
137


138






































-- module counter - counts some junk



local _counter = {}




local redis = require 'resty.redis'
local cjson = require 'cjson'

local closed = false

function _counter.exit()

	if not closed then
		local ok, err = _counter.red:set_keepalive(60000, 10)
		if not ok then
			ngx.log(ngx.ERR, "redis keepalive: " .. err)
		end
		closed = true
	end
end


function _counter.decode_bulk(reply)
	-- decodes values from array into dict

	local data = {}
	for j=1, #reply, 2 do
		data[reply[j] ] = reply[j+1]
	end
	return data
end

function _counter.connect()
	-- connect to redis
	_counter.red = redis:new()
	local ok, err = _counter.red:connect('127.0.0.1', 6379)
	if not ok then
		ngx.log(ngx.ERR, 'redis connection failed: ', err)
		ngx.exit(ngx.HTTP_INTERNAL_SERVER_ERROR)
	end
	closed = false
end



function _counter.check_schema()
	if ngx.var.site_schema == nil then
		ngx.log(ngx.ERR, "No 'site_schema' specified");
		ngx.exit(ngx.HTTP_INTERNAL_SERVER_ERROR)
	end
end

function _counter.update()
	_counter.check_schema()
	local today = os.date('*t')
	local timestamp = os.time{year = today['year'], month = today['month'], day = today['day']}
	local referer = '-';

	if ngx.var.http_referer ~= nil then
		_, _, referer = string.find(ngx.var.http_referer, '^https?://([^?&]+)')
		--ngx.log(ngx.ERR, "Referer: " .. referer)

	end

	local key = ngx.var.site_schema .. '_counter_' .. timestamp .. '_' .. referer

	-- ngx.log(ngx.ERR, "Using key: " .. key)



	local res, err
	_counter.red:init_pipeline(4)
	_counter.red:multi()
	_counter.red:hincrby(key, 'today', 1)
	_counter.red:hgetall(key)
	_counter.red:exec()
	res, err = _counter.red:commit_pipeline()
	if not res then
		ngx.log(ngx.ERR, 'redis pipeline failed: ', err)
		ngx.exit(ngx.HTTP_INTERNAL_SERVER_ERROR)
	end

	local data = _counter.red:array_to_hash(res[4][2])
	-- ngx.log(ngx.ERR, "Got data: " .. cjson.encode(data))

	if tonumber(data.today) == 1 then
		ngx.log(ngx.ERR, "Reading postgres")
		-- postgres fallback
		result = ngx.location.capture('/postgres', {
			method = ngx.HTTP_PUT,
			body = "select * from " .. ngx.var.site_schema .. ".get_stats('" .. referer .. "') as (today int, lastday int, week bigint, whole bigint);"
		})
		if result.status ~= 200 or not result.body then
			ngx.log(ngx.ERR, 'postgres access failed')
			ngx.exit(ngx.HTTP_INTERNAL_SERVER_ERROR)
		else
			local unrds = require "rds.parser"
			local res, err = unrds.parse(result.body)

			if res == nil then
				ngx.log(ngx.ERR, 'failed to obtain data: ' .. err)
				ngx.exit(ngx.HTTP_INTERNAL_SERVER_ERROR)
			else
				data = res.resultset[1]
				data.today = data.today + 1
				ngx.log(ngx.ERR, "Got data: " .. cjson.encode(data))

				_counter.red:init_pipeline(4)
				_counter.red:multi()
				_counter.red:hmset(key, data)
				_counter.red:expire(key, 129600)
				_counter.red:exec()
				res, err = _counter.red:commit_pipeline()
				if not res then
					ngx.log(ngx.ERR, 'redis pipeline failed: ', err)
					ngx.exit(ngx.HTTP_INTERNAL_SERVER_ERROR)
				end
			end
		end
	end


	ngx.say(cjson.encode(data))
	ngx.eof()


	local uid = ''

	if ngx.var.uid_got ~= nil and string.find(ngx.var.uid_got, 'uid=') == 1 then
		uid = string.sub(ngx.var.uid_got, 5)
	elseif ngx.var.uid_set ~= nil and string.find(ngx.var.uid_set, 'uid=') == 1 then
		uid = string.sub(ngx.var.uid_set, 5)
	end

	local hit_key = uid .. '_' .. referer .. '_' .. ngx.var.remote_addr
	local key_pending = ngx.var.site_schema .. '_counter_pending'

	_counter.red:init_pipeline(4)
	_counter.red:multi()
	_counter.red:hincrby(key_pending, hit_key, 1)
	_counter.red:expire(key_pending, 604800)

	_counter.red:exec()
	res, err = _counter.red:commit_pipeline()
	if not res then
		ngx.log(ngx.ERR, 'redis transaction failed: ', err)
		ngx.exit(ngx.HTTP_INTERNAL_SERVER_ERROR)
	end




end



return _counter






































1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76

77
SET search_path = auto, pg_catalog;
BEGIN;

CREATE TABLE counter (
	uid text NOT NULL,
	referer text DEFAULT '-' NOT NULL,
	ip inet NOT NULL,
	day date DEFAULT now() NOT NULL,
	count integer DEFAULT 0 NOT NULL
);

CREATE TABLE counter_sum (
	referer text DEFAULT '-'::text NOT NULL,
	day date DEFAULT now() NOT NULL,
	count integer DEFAULT 0 NOT NULL
);

CREATE or replace FUNCTION get_stats(target text) RETURNS record LANGUAGE sql AS $_$
select 
	coalesce((select count from counter_sum where day = current_date and referer = $1), 0) as today,
	coalesce((select count from counter_sum where day = (current_date - interval '1 day')::date and referer = $1), 0) as lastday,
	coalesce((select sum(count) from counter_sum
		where date_trunc('week', current_date) = date_trunc('week', day) and day <> current_date and referer = $1
		group by referer), 0) as week,
	coalesce((select sum(count) from counter_sum where day <> current_date and referer = $1 group by referer), 0) as whole
$_$;

CREATE FUNCTION counter_sum__change_trigger() RETURNS trigger LANGUAGE plpgsql AS $$
declare
	delta_count smallint;
begin
	if (TG_OP = 'UPDATE') then
		if (old.referer <> new.referer or old.day <> new.day) then
			raise exception 'Changing referer or day is prohibited';
		end if;
		delta_count = new.count - old.count;
	elsif (TG_OP = 'INSERT') then
		delta_count = new.count;
	elsif (TG_OP = 'DELETE') then
		raise exception 'Deleting counters prohibited';
	end if;
	<<insert_update>> loop
		update counter_sum set count = count + delta_count
			where referer = new.referer and day = new.day;
		exit insert_update when found;
		begin
			insert into counter_sum (referer, day, count) values (new.referer, new.day, delta_count);
			exit insert_update;
			exception when unique_violation then
		end;
	end loop insert_update;
	return new;
end; $$;

CREATE FUNCTION merge_counter(merge_uid text, merge_referer text, merge_ip inet, merge_count smallint) RETURNS boolean LANGUAGE plpgsql AS $$
BEGIN
	<<insert_update>> LOOP
		UPDATE counter SET count = count + merge_count
			WHERE uid = merge_uid and ip = merge_ip and referer = merge_referer and day = current_date;
		exit insert_update when found;
		BEGIN
			INSERT INTO counter (uid, referer, ip, count) VALUES (merge_uid, merge_referer, merge_ip, merge_count);
			EXIT insert_update;
			EXCEPTION WHEN unique_violation THEN
		END;
	END LOOP insert_update;
	return true;
END;
$$;

CREATE INDEX counter__date ON counter USING btree (day);
CREATE UNIQUE INDEX counter__uid_nreferer_ip_date ON counter (uid, ip, day) WHERE (referer IS NULL);
CREATE UNIQUE INDEX counter__uid_referer_ip_date ON counter (uid, referer, ip, day);
create unique index counter_sum__referer_day ON counter_sum (referer, day);
CREATE TRIGGER counter__change AFTER INSERT OR DELETE OR UPDATE ON counter FOR EACH ROW EXECUTE PROCEDURE counter_sum__change_trigger();


end;
<
<
<
|







|







|
|
|


|


|














|



|







|


|



|








|
|
|
|
<

>
|



1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71

72
73
74



CREATE TABLE if not exists counter (
	uid text NOT NULL,
	referer text DEFAULT '-' NOT NULL,
	ip inet NOT NULL,
	day date DEFAULT now() NOT NULL,
	count integer DEFAULT 0 NOT NULL
);

CREATE TABLE if not exists counter_sum (
	referer text DEFAULT '-'::text NOT NULL,
	day date DEFAULT now() NOT NULL,
	count integer DEFAULT 0 NOT NULL
);

CREATE or replace FUNCTION get_stats(target text) RETURNS record LANGUAGE sql AS $_$
select 
	coalesce((select count from limbo.counter_sum where day = current_date and referer = $1), 0) as today,
	coalesce((select count from limbo.counter_sum where day = (current_date - interval '1 day')::date and referer = $1), 0) as lastday,
	coalesce((select sum(count) from limbo.counter_sum
		where date_trunc('week', current_date) = date_trunc('week', day) and day <> current_date and referer = $1
		group by referer), 0) as week,
	coalesce((select sum(count) from limbo.counter_sum where day <> current_date and referer = $1 group by referer), 0) as whole
$_$;

CREATE or replace FUNCTION counter_sum__change_trigger() RETURNS trigger LANGUAGE plpgsql AS $$
declare
	delta_count smallint;
begin
	if (TG_OP = 'UPDATE') then
		if (old.referer <> new.referer or old.day <> new.day) then
			raise exception 'Changing referer or day is prohibited';
		end if;
		delta_count = new.count - old.count;
	elsif (TG_OP = 'INSERT') then
		delta_count = new.count;
	elsif (TG_OP = 'DELETE') then
		raise exception 'Deleting counters prohibited';
	end if;
	<<insert_update>> loop
		update limbo.counter_sum set count = count + delta_count
			where referer = new.referer and day = new.day;
		exit insert_update when found;
		begin
			insert into limbo.counter_sum (referer, day, count) values (new.referer, new.day, delta_count);
			exit insert_update;
			exception when unique_violation then
		end;
	end loop insert_update;
	return new;
end; $$;

CREATE or replace FUNCTION merge_counter(merge_uid text, merge_referer text, merge_ip inet, merge_count smallint) RETURNS boolean LANGUAGE plpgsql AS $$
BEGIN
	<<insert_update>> LOOP
		UPDATE limbo.counter SET count = count + merge_count
			WHERE uid = merge_uid and ip = merge_ip and referer = merge_referer and day = current_date;
		exit insert_update when found;
		BEGIN
			INSERT INTO limbo.counter (uid, referer, ip, count) VALUES (merge_uid, merge_referer, merge_ip, merge_count);
			EXIT insert_update;
			EXCEPTION WHEN unique_violation THEN
		END;
	END LOOP insert_update;
	return true;
END;
$$;

CREATE INDEX if not exists counter__date ON counter USING btree (day);
CREATE UNIQUE INDEX if not exists counter__uid_nreferer_ip_date ON counter (uid, ip, day) WHERE (referer IS NULL);
CREATE UNIQUE INDEX if not exists counter__uid_referer_ip_date ON counter (uid, referer, ip, day);
create unique index if not exists counter_sum__referer_day ON counter_sum (referer, day);


drop trigger if exists counter__change on counter;
CREATE TRIGGER counter__change AFTER INSERT OR DELETE OR UPDATE ON counter FOR EACH ROW EXECUTE PROCEDURE counter_sum__change_trigger();