-
Notifications
You must be signed in to change notification settings - Fork 0
/
redis_pool.v
161 lines (134 loc) · 2.89 KB
/
redis_pool.v
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
module vredis
import sync
import time
const err_pool_exhausted = error('vredis: connection pool exhausted')
const err_conn_closed = error('vredis: connection closed')
const err_conn_no_active = error('vredis: client no active')
const err_read_message = error('vredis: read message error')
type DialFn = fn () !&Redis
// PoolOpt Struct representing the options for a connection pool.
@[params]
pub struct PoolOpt {
pub:
dial DialFn = unsafe { nil } // Function used to establish a connection.
max_active int = 10 // Maximum number of active connections allowed in the pool.
idle_timeout i64 = 600 // Maximum time in seconds that an idle connection can stay in the pool.
max_conn_life_time i64 = 600 // Maximum time in seconds that a connection can stay alive.
test_on_borrow fn (&ActiveRedisConn) ! = unsafe { nil } // Function used to test a connection before borrowing it from the pool.
}
pub struct Pool {
sync.Once
mut:
opt PoolOpt
close bool
connections chan &ActiveRedisConn
mu sync.Mutex
active u32
}
pub fn new_pool(opt PoolOpt) !&Pool {
if isnil(opt.dial) {
return error('invalid dial fn setting')
}
return &Pool{
opt: opt
connections: chan &ActiveRedisConn{cap: opt.max_active}
}
}
pub fn (mut p Pool) str() string {
p.mu.@lock()
defer {
p.mu.unlock()
}
return '&vredis.Pool{
active: ${p.active}
len: ${p.connections.len}
close: ${p.close}
opt: ${p.opt}
}'
}
pub fn (mut p Pool) get() !&ActiveRedisConn {
p.mu.@lock()
defer {
p.mu.unlock()
}
if p.close {
return vredis.err_conn_closed
}
if p.active >= p.opt.max_active {
return vredis.err_pool_exhausted
}
for {
select {
mut client := <-p.connections {
unix := time.now().unix()
if unix - client.active_time >= p.opt.max_conn_life_time {
client.close() or {}
continue
}
if unix - client.put_in_time >= p.opt.idle_timeout {
client.close() or {}
continue
}
if !isnil(p.opt.test_on_borrow) {
p.opt.test_on_borrow(client) or {
client.close() or {}
continue
}
}
client.is_active = true
return client
}
else {
mut client := p.opt.dial()!
p.active++
return &ActiveRedisConn{
active_time: time.now().unix()
pool: &p
Redis: client
}
}
}
}
return vredis.err_pool_exhausted
}
pub fn (mut p Pool) active_cnt() u32 {
p.mu.@lock()
defer {
p.mu.unlock()
}
return p.active
}
pub fn (mut p Pool) put(mut client ActiveRedisConn) {
p.mu.@lock()
defer {
p.active--
p.mu.unlock()
}
if p.close {
return
}
if !client.is_active {
return
}
client.is_active = false
select {
p.connections <- client {}
else {
client.close() or {}
}
}
}
pub fn (mut p Pool) close() {
p.close = true
for {
select {
mut client := <-p.connections {
client.close() or {}
}
else {
p.connections.close()
break
}
}
}
}