Hunter的大杂烩

November 13, 2010

rw-splitting.lua注释版

Filed under: 技术话题 — hunter @ 2:49 am

 

— 注释by hunter 2010/10

— a flexible statement based load balancer with connection pooling

— * build a connection pool of min_idle_connections for each backend and maintain
—   its size
— *

local commands    = require(“proxy.commands”)
local tokenizer   = require(“proxy.tokenizer”)
local lb          = require(“proxy.balance”)
local auto_config = require(“proxy.auto-config”)

— config

— connection pool
if not proxy.global.config.rwsplit then
 proxy.global.config.rwsplit = { — 初始化一个配置table
  min_idle_connections = 4,
  max_idle_connections = 8,
  max_connection_per_backend = 50, — 每个server最多50个connection

  is_debug = false
 }
end


— read/write splitting sends all non-transactional SELECTs to the slaves

— is_in_transaction tracks the state of the transactions
local is_in_transaction       = false

— if this was a SELECT SQL_CALC_FOUND_ROWS … stay on the same connections
local is_in_select_calc_found_rows = false


— get a connection to a backend

— as long as we don’t have enough connections in the pool, create new connections
— 系统处理流程第一步
function connect_server()
 local is_debug = proxy.global.config.rwsplit.is_debug
 — make sure that we connect to each backend at least ones to
 — keep the connections to the servers alive
 —
 — on read_query we can switch the backends again to another backend

 if is_debug then
  print()
  print(“[connect_server] ” .. proxy.connection.client.src.name)
 end

 local rw_ndx = 0;

 — init all backends
 — lua里面,一个table(类似php的array),是从1开始索引的
 — proxy.global.backends[N]应该是一个 mysql server
 for i = 1, #proxy.global.backends do
  local s        = proxy.global.backends[i]
  local pool     = s.pool — we don’t have a username yet, try to find a connections which is idling
  local cur_idle = pool.users[“”].cur_idle_connections

  pool.min_idle_connections = proxy.global.config.rwsplit.min_idle_connections;
  pool.max_idle_connections = proxy.global.config.rwsplit.max_idle_connections;
  pool.max_connection_per_backend = proxy.global.config.rwsplit.max_connection_per_backend;
  
  if is_debug then
   print(”  [“.. i ..”].connected_clients = ” .. s.connected_clients)
   print(”  [“.. i ..”].pool.cur_idle     = ” .. cur_idle)
   print(”  [“.. i ..”].pool.max_idle     = ” .. pool.max_idle_connections)
   print(”  [“.. i ..”].pool.min_idle     = ” .. pool.min_idle_connections)
   print(”  [“.. i ..”].type = ” .. s.type)
   print(”  [“.. i ..”].state = ” .. s.state)
  end

  — prefer connections to the master
  — 问题: 随便找一个后端server?
  if s.type == proxy.BACKEND_TYPE_RW and
     s.state ~= proxy.BACKEND_STATE_DOWN and
     cur_idle < pool.min_idle_connections then  -- 问题: cur_idle < pool.min_idle_connections??啥条件?     -- proxy.connection 是一个内部数据结构     -- http://dev.mysql.com/doc/refman/5.0/en/mysql-proxy-scripting-structures.html#mysql-proxy-scripting-structures-connection
   proxy.connection.backend_ndx = i;  — 问题: proxy.global.backends[i]是否空闲?
   break
  elseif s.type == proxy.BACKEND_TYPE_RO and
         s.state ~= proxy.BACKEND_STATE_DOWN and
         cur_idle < pool.min_idle_connections then    proxy.connection.backend_ndx = i    break   elseif s.type == proxy.BACKEND_TYPE_RW and          s.state ~= proxy.BACKEND_STATE_DOWN and          rw_ndx == 0 then    rw_ndx = i   end  end   -- 问题: proxy.connection.backend_ndx 有没有被赋值?   -- 如果没找到合适的链接,则给master server创建一个新的connection  if proxy.connection.backend_ndx == 0 then   if is_debug then    print("  [" .. rw_ndx .. "] taking master as default")   end   proxy.connection.backend_ndx = rw_ndx  end  -- pick a random backend  --  -- we someone have to skip DOWN backends  -- ok, did we got a backend ?   -- 如果这个connection以前有选择过backend,则返回  if proxy.connection.server then   if is_debug then    print("  using pooled connection from: " .. proxy.connection.backend_ndx)   end   -- stay with it   return proxy.PROXY_IGNORE_RESULT  end  if is_debug then   print("  [" .. proxy.connection.backend_ndx .. "] idle-conns below min-idle")  end  -- open a new connection  -- 不return proxy.PROXY_IGNORE_RESULT,则会创建新connection end -- 系统处理流程第2步 function read_handshake()   --[[ 范例  if not proxy.connection.client.src.name:match("^127.0.0.1:") then          proxy.response.type = proxy.MYSQLD_PACKET_ERR          proxy.response.errmsg = "only local connects are allowed"            print("we don't like this client");            return proxy.PROXY_SEND_RESULT  end  --]] end --- -- put the successfully authed connection into the connection pool -- -- @param auth the context information for the auth -- -- auth.packet is the packet -- 系统处理流程第3步 function read_auth_result( auth )  if is_debug then   print("[read_auth_result] " .. proxy.connection.client.src.name)  end  if auth.packet:byte() == proxy.MYSQLD_PACKET_OK then   -- auth was fine, disconnect from the server   proxy.connection.backend_ndx = 0  elseif auth.packet:byte() == proxy.MYSQLD_PACKET_EOF then   -- we received either a   --   -- * MYSQLD_PACKET_ERR and the auth failed or   -- * MYSQLD_PACKET_EOF which means a OLD PASSWORD (4.0) was sent   print("(read_auth_result) ... not ok yet");  elseif auth.packet:byte() == proxy.MYSQLD_PACKET_ERR then   -- auth failed  end end --- -- read/write splitting function read_query( packet )  local is_debug = proxy.global.config.rwsplit.is_debug  local cmd      = commands.parse(packet)  local c        = proxy.connection.client   -- 处理非select/update/delete/insert命令  local r = auto_config.handle(cmd)  if r then   return r;  end  local tokens  local norm_query  -- looks like we have to forward this statement to a backend  if is_debug then   print("[read_query] " .. proxy.connection.client.src.name)   print("  current backend   = " .. proxy.connection.backend_ndx)   print("  client default db = " .. c.default_db)   print("  client username   = " .. c.username)   if cmd.type == proxy.COM_QUERY then    print("  query             = "        .. cmd.query)   end  end  if cmd.type == proxy.COM_QUIT then   -- don't send COM_QUIT to the backend. We manage the connection   -- in all aspects.   proxy.response = {    type = proxy.MYSQLD_PACKET_OK,   }     if is_debug then    print("  (QUIT) current backend   = " .. proxy.connection.backend_ndx)   end   -- 伪装服务器响应,因此connection 会有对应的server配置?   return proxy.PROXY_SEND_RESULT  end  proxy.queries:append(1, packet, { resultset_is_needed = true })  -- read/write splitting  --  -- send all non-transactional SELECTs to a slave  if not is_in_transaction and     cmd.type == proxy.COM_QUERY then      tokens     = tokens or assert(tokenizer.tokenize(cmd.query))   local stmt = tokenizer.first_stmt_token(tokens)   -- 问题:是否要判断 BEGIN/START/COMMIT关键字?   if stmt.token_name == "TK_SQL_SELECT" then    is_in_select_calc_found_rows = false    local is_insert_id = false    for i = 1, #tokens do     local token = tokens[i]     -- SQL_CALC_FOUND_ROWS + FOUND_ROWS() have to be executed     -- on the same connection     -- print("token: " .. token.token_name)     -- print("  val: " .. token.text)          if not is_in_select_calc_found_rows and token.token_name == "TK_SQL_SQL_CALC_FOUND_ROWS" then      is_in_select_calc_found_rows = true     elseif not is_insert_id and token.token_name == "TK_LITERAL" then      local utext = token.text:upper()      if utext == "LAST_INSERT_ID" or         utext == "@@INSERT_ID" then       is_insert_id = true      end     end     -- we found the two special token, we can't find more     if is_insert_id and is_in_select_calc_found_rows then      break     end    end    -- if we ask for the last-insert-id we have to ask it on the original    -- connection    if not is_insert_id then     local backend_ndx = lb.idle_ro()     if backend_ndx > 0 then
     proxy.connection.backend_ndx = backend_ndx
    end
   else
    print(”   found a SELECT LAST_INSERT_ID(), staying on the same backend”)
   end
  end
 end

 — no backend selected yet, pick a master
 if proxy.connection.backend_ndx == 0 then
  — we don’t have a backend right now
  —
  — let’s pick a master as a good default
  —
  proxy.connection.backend_ndx = lb.idle_failsafe_rw()
 end

 — by now we should have a backend
 —
 — in case the master is down, we have to close the client connections
 — otherwise we can go on
 if proxy.connection.backend_ndx == 0 then
  return proxy.PROXY_SEND_QUERY
 end

 local s = proxy.connection.server

 — if client and server db don’t match, adjust the server-side
 —
 — skip it if we send a INIT_DB anyway
 if cmd.type ~= proxy.COM_INIT_DB and
    c.default_db and c.default_db ~= s.default_db then
  print(”    server default db: ” .. s.default_db)
  print(”    client default db: ” .. c.default_db)
  print(”    syncronizing”)
  proxy.queries:prepend(2, string.char(proxy.COM_INIT_DB) .. c.default_db, { resultset_is_needed = true })
 end

 — send to master
 if is_debug then
  if proxy.connection.backend_ndx > 0 then
   local b = proxy.global.backends[proxy.connection.backend_ndx]
   print(”  sending to backend : ” .. b.dst.name);
   print(”    is_slave         : ” .. tostring(b.type == proxy.BACKEND_TYPE_RO));
   print(”    server default db: ” .. s.default_db)
   print(”    server username  : ” .. s.username)
  end
  print(”    in_trans        : ” .. tostring(is_in_transaction))
  print(”    in_calc_found   : ” .. tostring(is_in_select_calc_found_rows))
  print(”    COM_QUERY       : ” .. tostring(cmd.type == proxy.COM_QUERY))
 end

 return proxy.PROXY_SEND_QUERY
end


— as long as we are in a transaction keep the connection
— otherwise release it so another client can use it
function read_query_result( inj )
 local is_debug = proxy.global.config.rwsplit.is_debug
 local res      = assert(inj.resultset)
   local flags    = res.flags

 if inj.id ~= 1 then
  — ignore the result of the USE
  — the DB might not exist on the backend, what do do ?
  —
  if inj.id == 2 then
   — the injected INIT_DB failed as the slave doesn’t have this DB
   — or doesn’t have permissions to read from it
   if res.query_status == proxy.MYSQLD_PACKET_ERR then
    proxy.queries:reset()

    proxy.response = {
     type = proxy.MYSQLD_PACKET_ERR,
     errmsg = “can’t change DB “.. proxy.connection.client.default_db ..
      ” to on slave ” .. proxy.global.backends[proxy.connection.backend_ndx].dst.name
    }

    return proxy.PROXY_SEND_RESULT
   end
  end
  return proxy.PROXY_IGNORE_RESULT
 end

 is_in_transaction = flags.in_trans
 local have_last_insert_id = (res.insert_id and (res.insert_id > 0))

 if not is_in_transaction and
    not is_in_select_calc_found_rows and
    not have_last_insert_id then
  — release the backend
  proxy.connection.backend_ndx = 0
 elseif is_debug then
  print(“(read_query_result) staying on the same backend”)
  print(”    in_trans        : ” .. tostring(is_in_transaction))
  print(”    in_calc_found   : ” .. tostring(is_in_select_calc_found_rows))
  print(”    have_insert_id  : ” .. tostring(have_last_insert_id))
 end
end


— close the connections if we have enough connections in the pool

— @return nil – close connection
—         IGNORE_RESULT – store connection in the pool
function disconnect_client()
 local is_debug = proxy.global.config.rwsplit.is_debug
 if is_debug then
  print(“[disconnect_client] ” .. proxy.connection.client.src.name)
 end

 — make sure we are disconnection from the connection
 — to move the connection into the pool
 proxy.connection.backend_ndx = 0
end

 

No Comments

No comments yet.

RSS feed for comments on this post.

Sorry, the comment form is closed at this time.

Powered by WordPress