Coverage

98%
262
257
5

cluster.js

97%
100
97
3
LineHitsSource
1/* vim: set expandtab tabstop=2 shiftwidth=2 foldmethod=marker: */
2
31"use strict";
4
51var util = require('util');
61var Emitter = require('events').EventEmitter;
71var Pool = require(__dirname + '/pool.js');
81var Queue = require('safequeue');
9
101var MAX_QUEUED_SIZE = 1000;
11
121var READONLY = 1;
131var WRITABLE = 2;
14
151var _QueueTimeoutError = function (msg) {
166 var e = new Error(msg);
176 e.name = 'QueueTimeout';
186 return e;
19};
20
211var _QueueIsFullError = function (msg) {
222 var e = new Error(msg);
232 e.name = 'QueueIsFull';
242 return e;
25};
26
27/* {{{ private function _readonly() */
281var _readonly = function (sql) {
2920 return sql.match(/^(SELECT|SHOW|DESC|DESCRIBE|KILL)\s+/i) ? true : false;
30};
31/* }}} */
32
33/* {{{ private function _remove() */
341var _remove = function (a, o) {
3510 var i = a.indexOf(o);
3610 if (i > -1) {
370 a.splice(i, 1);
38 }
39
4010 return a;
41};
42/* }}} */
43
441exports.create = function (options) {
45
46 /**
47 * @ 心跳SQL
48 */
494 var hbquery = 'SHOW VARIABLES LIKE "READ_ONLY"';
50
51 /* {{{ private function hbparse() */
52 /**
53 * parse heartbeat result
54 *
55 * @return Integer 0 : offline; 1 : readonly; 3 : writable
56 */
574 var hbparse = function (res) {
582 if (!res || !res.length) {
590 return 0;
60 }
61
622 var s = READONLY;
632 if (((res.shift() || {}).Value + '').match(/^(off)$/i)) {
642 s |= WRITABLE;
65 }
66
672 return s;
68 };
69 /* }}} */
70
714 var Cluster = function () {
724 Emitter.call(this);
73 };
744 util.inherits(Cluster, Emitter);
75
76 /**
77 * @ 连接池列表
78 */
794 var backups = {};
80
81 /**
82 * @ 读写列表
83 */
844 var rwlists = [];
85
86 /**
87 * @ 读写队列
88 */
894 var rwqueue = Queue.create({'timeout' : 0, 'maxitem' : MAX_QUEUED_SIZE});
904 rwqueue.on('timeout', function (item, timeout) {
915 (item[2])(_QueueTimeoutError('Query stays in the queue more than ' + timeout + ' ms'));
92 });
93
94 /**
95 * @ 只读列表
96 */
974 var rolists = [];
98
99 /**
100 * @ 只读队列
101 */
1024 var roqueue = Queue.create({'timeout' : 0, 'maxitem' : MAX_QUEUED_SIZE});
1034 roqueue.on('timeout', function (item, timeout) {
1041 (item[2])(_QueueTimeoutError('Query stays in the queue more than ' + timeout + ' ms'));
105 });
106
107 /* {{{ public prototype setHeartBeatQuery() */
1084 Cluster.prototype.setHeartBeatQuery = function (sql, parser) {
1092 Object.keys(backups).forEach(function (i) {
1101 backups[i].setHeartBeatQuery(sql);
111 });
1122 hbquery = sql;
1132 if ('function' === (typeof parser)) {
1142 hbparse = parser;
115 }
116 };
117 /* }}} */
118
119 /* {{{ private function checkQueue() */
1204 var checkQueue = function (queue, pool, max) {
1216 var max = (~~max) || 4;
1226 while (max > 0) {
12314 var s = queue.shift();
12414 if (!s || !s.length) {
1255 return;
126 }
1279 pool.query(s[0], s[1], s[2]);
1289 max--;
129 }
130
1311 if (queue.size() > 0) {
1321 process.nextTick(function () {
1331 checkQueue(queue, pool, max);
134 });
135 }
136 };
137 /* }}} */
138
139 /* {{{ public prototype addserver() */
1404 Cluster.prototype.addserver = function (config) {
1415 var p = Pool.create(options, config);
1425 var i = p._name();
1435 backups[i] = p;
144
1455 var _self = this;
1465 p.on('error', function (e) {
1474 _self.emit('error', e);
148 });
1495 p.on('busy', function (n, c) {
1509 _self.emit('busy', n, c, i);
151 });
152
1535 p.setHeartBeatQuery(hbquery);
1545 p.on('state', function (res) {
1558 var s = hbparse(res);
1568 if (!s) {
1575 rolists = _remove(rolists, i);
1585 rwlists = _remove(rwlists, i);
1595 return;
160 }
161
1623 if (rolists.indexOf(i) < 0) {
1633 checkQueue(roqueue, p);
1643 rolists.push(i);
165 }
1663 if ((WRITABLE & s) && rwlists.indexOf(i) < 0) {
1672 checkQueue(rwqueue, p);
1682 rwlists.push(i);
169 }
170 });
171 };
172 /* }}} */
173
174 /**
175 * @ 读计数器
176 */
1774 var rocount = 0;
178
179 /**
180 * @ 写计数器
181 */
1824 var rwcount = 0;
183
184 /* {{{ public function query() */
1854 Cluster.prototype.query = function (sql, tmout, callback) {
186
18720 if ('function' === (typeof tmout)) {
1884 callback = tmout;
1894 tmout = 0;
19020 };
191
19220 if (!_readonly(sql.sql || sql)) {
1936 if (rwlists.length < 1) {
1946 if (rwqueue.push([sql, tmout, callback], tmout) < 0) {
1951 callback(_QueueIsFullError('Too many queries queued'));
196 }
197 } else {
1980 backups[rwlists[(++rwcount) % rwlists.length]].query(sql, tmout, callback);
199 }
200 } else {
20114 if (rolists.length < 1) {
20211 if (roqueue.push([sql, tmout, callback], tmout) < 0) {
2031 callback(_QueueIsFullError('Too many queries queued'));
204 }
205 } else {
2063 backups[rolists[(++rocount) % rolists.length]].query(sql, tmout, callback);
207 }
208 }
209 };
210 /* }}} */
211
2124 return new Cluster();
213};
214

connection.js

98%
51
50
1
LineHitsSource
1/* vim: set expandtab tabstop=2 shiftwidth=2 foldmethod=marker: */
2
31"use strict";
4
51var util = require('util');
61var mysql = require('mysql-robin');
71var EventEmitter = require('events').EventEmitter;
81var sqlString = require('mysql-robin/lib/protocol/SqlString');
9
10/**
11 * @ Connection
12 */
131var Connection = function (options) {
14
1515 EventEmitter.call(this);
16
17 /**
18 * 0 : 未连接
19 * 1 : 正在连接
20 * 2 : 连接成功
21 * -1: 准备断开
22 */
2315 this._flag = 0;
2415 options.port = options.port || 3306;
2515 this._name = util.format('%s@%s:%d', options.user, options.host, options.port);
2615 this._conn = mysql.createConnection(options);
27
2815 var _self = this;
2915 this._conn.on('error', function (e) {
301 if (e && e.fatal && _self._flag > -1) {
311 _self.close();
32 }
331 _self.emit('error', _self._error(e));
34 });
35};
361util.inherits(Connection, EventEmitter);
37
381Connection.prototype._error = function (name, msg) {
398 var e;
408 if (name instanceof Error) {
416 e = name;
426 e.name = (e.name && 'Error' !== e.name) ? e.name : 'MysqlError';
43 } else {
442 e = new Error(msg || name);
452 e.name = name;
46 }
478 e.message = util.format('%s (%s)', e.message, this._name);
488 return e;
49};
50
511Connection.prototype.close = function () {
52
537 if (this._flag < 0) {
541 return;
55 }
56
576 this._flag = -1;
586 this._conn.end();
59};
60
611Connection.prototype.query = function (sql, timeout, callback) {
62
6326 if ((typeof sql) === 'object' && sql.params) {
640 sql = this.format(sql.sql, sql.params);
65 }
66
6726 var _self = this;
6826 if (!timeout || timeout < 1) {
696 return this._conn.query(sql, function (e, r) {
706 callback(e ? _self._error(e) : null, r);
71 });
72 }
73
7420 var timer = setTimeout(function () {
752 callback(_self._error('QueryTimeout', 'Mysql query timeout after ' + timeout + ' ms'));
762 callback = function (e, r) {
772 _self.emit('late', e, r, sql);
78 };
79 }, timeout);
8020 _self._conn.query(sql, function (e, r) {
8120 clearTimeout(timer);
8220 timer = null;
8320 callback(e ? _self._error(e) : null, r);
84 });
85};
86
871Connection.prototype.format = function (sql, params) {
881 return sql.replace(/:(\w+)/g, function (w, i) {
895 return sqlString.escape(params[i]);
90 });
91};
92
931exports.create = function (options) {
9415 return new Connection(options);
95};
96

pool.js

99%
111
110
1
LineHitsSource
1/* vim: set expandtab tabstop=2 shiftwidth=2 foldmethod=marker: */
2
32"use strict";
4
52var util = require('util');
62var Emitter = require('events').EventEmitter;
7
82var Queue = require('safequeue');
92var Connection = require(__dirname + '/connection.js');
10
112var HEARTBEAT_TIMEOUT = 1000;
12
132var _QueueTimeoutError = function (msg) {
141 var e = new Error(msg);
151 e.name = 'QueueTimeout';
161 return e;
17};
18
192exports.create = function (options, config) {
20
2110 var _options = {
22 'maxconnections' : 4,
23 'maxidletime' : 60000,
24 };
2510 for (var i in options) {
2611 _options[i] = options[i];
27 }
28 /**
29 * @ 连接数组
30 */
3110 var conns = [];
32
33 /**
34 * @ 重连暂停
35 */
3610 var pause = 100;
37
38 /**
39 * @ 心跳SQL
40 */
4110 var hbsql = 'SHOW VARIABLES LIKE "READ_ONLY"';
42
43 /**
44 * @ 心跳计时器
45 */
4610 var tbeat = null;
47
48 /**
49 * @ 空闲计时器
50 */
5110 var timer = {};
52
53 /* {{{ private function startup() */
5410 var startup = function (o) {
55
5613 clearTimeout(tbeat);
5713 tbeat = null;
58
5913 var c = Connection.create(config);
6013 c.on('error', function () {});
61
6213 var s = '';
6313 conns.unshift(c);
6413 (function heartbeat() {
6520 c.query(hbsql, HEARTBEAT_TIMEOUT, function (e, r) {
6620 if (e) {
675 o.emit('state');
685 conns.shift();
695 setTimeout(function () {
703 if (c) {
713 c.close();
723 c = null;
73 }
743 startup(o);
75 }, pause);
76
775 pause = Math.min(pause + pause, 60000);
785 o.emit('error', e);
79 } else {
8015 tbeat = setTimeout(heartbeat, 10 * HEARTBEAT_TIMEOUT);
8115 pause = 100;
8215 var t = JSON.stringify(r);
8315 if (t !== s) {
849 s = t;
859 o.emit('state', r);
86 }
87 }
88 });
89 })();
90 };
91 /* }}} */
92
93 /* {{{ private function _remove() */
9410 var _remove = function (c, o) {
954 var i = conns.indexOf(c);
964 if (i < 0) {
970 return;
98 }
994 c.close();
1004 c = null;
1014 conns.splice(i, 1);
1024 i = o._stack.indexOf(i);
1034 if (i > -1) {
1041 o._stack.splice(i, 1);
105 }
106 };
107 /* }}} */
108
109 /* {{{ private function execute() */
11010 var execute = function (c, o, s) {
11126 c.query(s[0], s[1], function (e, r) {
11226 (s[2])(e, r);
11326 if (e && e.fatal) {
1141 _remove(c, o);
1151 return;
116 }
117
11825 s = o._queue.shift();
11925 if (!s) {
12011 release(o, c);
121 } else {
12214 execute(c, o, s);
123 }
124 });
125 };
126 /* }}} */
127
128 /* {{{ private function _wakeup() */
129 /**
130 * wake up a pool to execute query
131 */
13210 var _wakeup = function (o) {
13317 var m = Math.min(_options.maxconnections, 1 + _options.maxconnections + o._stack.length - conns.length);
13417 while (m && o._queue.size()) {
13512 var s = o._queue.shift();
13612 var i, c;
13712 do {
13812 i = o._stack.pop();
13912 if (i && conns[i]) {
1402 c = conns[i];
1412 clearTimeout(timer[i]);
1422 timer[i] = null;
143 }
144 } while (i && !c);
145
14612 if (!c) {
14710 c = Connection.create(config);
14810 conns.push(c);
14910 ['error', 'close'].forEach(function (k) {
15020 c.once(k, function (e) {
1512 _remove(c, o);
152 });
153 });
154 }
15512 execute(c, o, s);
15612 m--;
157 }
158 };
159 /* }}} */
160
161 /* {{{ private function release() */
16210 var release = function (o, c) {
16311 var i = conns.indexOf(c);
164 // XXX: we use conns[0] to heartbeat
16511 if (i > 0) {
16611 o._stack.push(i);
16711 timer[i] = setTimeout(function () {
1681 _remove(c, o);
169 }, _options.maxidletime);
170 }
171 };
172 /* }}} */
173
17410 var Mysql = function () {
175
17610 Emitter.call(this);
17710 startup(this);
178
179 /**
180 * @ 执行队列
181 */
18210 var _name = this._name();
18310 this._queue = Queue.create({'timeout' : 0, 'maxitem' : 0});
18410 this._queue.on('timeout', function (item, tmout, pos) {
1851 (item[2])(_QueueTimeoutError(util.format(
186 'Query stays in the queue more than %d ms (%s)', tmout, _name)));
187 });
188
18910 var _self = this;
19010 this._queue.on('fill', function () {
19117 _wakeup(_self);
192 });
193
194 /**
195 * @ 空闲连接
196 */
19710 this._stack = [];
198
199 };
20010 util.inherits(Mysql, Emitter);
201
202 /* {{{ public prototype _name() */
20310 Mysql.prototype._name = function () {
20415 return conns[0]._name;
205 };
206 /* }}} */
207
208 /* {{{ public prototype query() */
209 /**
210 * Get one connection and run sql
211 *
212 * @ param {String|Object} sql
213 * @ param {Function} cb
214 */
21510 Mysql.prototype.query = function (sql, tmout, cb) {
21627 this._queue.push([sql, tmout, cb], tmout);
21727 var n = this._queue.size();
21827 if (n > 0) {
21915 this.emit('busy', n, _options.maxconnections);
220 }
221 };
222 /* }}} */
223
224 /* {{{ public prototype setHeartBeatQuery() */
22510 Mysql.prototype.setHeartBeatQuery = function (sql) {
2267 hbsql = sql;
227 };
228 /* }}} */
229
23010 return new Mysql();
231};
232