mysql基于binlog回滾工具案例詳細(xì)說明
發(fā)表時(shí)間:2023-09-02 來源:明輝站整理相關(guān)軟件相關(guān)文章人氣:
[摘要]update、delete的條件寫錯(cuò)甚至沒有寫,導(dǎo)致數(shù)據(jù)操作錯(cuò)誤,需要恢復(fù)被誤操作的行記錄。這種情形,其實(shí)時(shí)有發(fā)生,可以選擇用備份文件+binlog來恢復(fù)到測試環(huán)境,然后再做數(shù)據(jù)修復(fù),但是這樣其實(shí)需要耗費(fèi)一定的時(shí)間跟資源。 其實(shí),如果binlog format為row,binlog文件中是會...
update、delete的條件寫錯(cuò)甚至沒有寫,導(dǎo)致數(shù)據(jù)操作錯(cuò)誤,需要恢復(fù)被誤操作的行記錄。這種情形,其實(shí)時(shí)有發(fā)生,可以選擇用備份文件+binlog來恢復(fù)到測試環(huán)境,然后再做數(shù)據(jù)修復(fù),但是這樣其實(shí)需要耗費(fèi)一定的時(shí)間跟資源。
其實(shí),如果binlog format為row,binlog文件中是會詳細(xì)記錄每一個(gè)事務(wù)涉及到操作,并把每一個(gè)事務(wù)影響到行記錄均存儲起來,能否給予binlog 文件來反解析數(shù)據(jù)庫的行記錄變動(dòng)情況呢?
業(yè)界已有不少相關(guān)的腳本及工具,但是隨著MySQL版本的更新、binlog記錄內(nèi)容的變化以及需求不一致,大多數(shù)腳本不太適合個(gè)人目前的使用需求,所以開始著手編寫 mysql的 flash back腳本。
如果轉(zhuǎn)載,請注明博文來源: www.cnblogs.com/xinysu/ ,版權(quán)歸 博客園 蘇家小蘿卜 所有。望各位支持!
僅在MySQL 5.6/5.7版本測試,python運(yùn)行環(huán)境需要安裝pymysql模塊。
1 實(shí)現(xiàn)內(nèi)容
根據(jù)binlog文件,對某個(gè)\些事務(wù)、某段時(shí)間某些表、某段時(shí)間全庫 做回滾操作,實(shí)現(xiàn)閃回功能。工具處理過程中,會把binlog中的事務(wù)修改的行記錄存儲到表格中去,通過 dml_sql 列,可以查看每一個(gè)事務(wù)內(nèi)部的所有行記錄變更情況,通過 undo_sql 查看回滾的SQL內(nèi)容。如下圖,然后再根據(jù)表格內(nèi)容做回滾操作。
那么這個(gè)腳本有哪些優(yōu)點(diǎn)呢?
回滾分為2個(gè)命令:第一個(gè)命令 分析binglog并存儲進(jìn)入數(shù)據(jù)庫;第二個(gè)命令 執(zhí)行回滾操作;
回滾的時(shí)候,可以把執(zhí)行腳本跟回滾腳本統(tǒng)一存放到數(shù)據(jù)庫中,可以查看 更新內(nèi)容以及回滾內(nèi)容;
根據(jù)存儲的分析表格,方便指定事務(wù)或者指定表格來來恢復(fù);
詳細(xì)的日志輸出,說明分析進(jìn)度跟執(zhí)行進(jìn)度。
分析binlog的輸出截圖(分析1G的binlog文件)
回滾數(shù)據(jù)庫的輸出截圖:
2 原理
前提:實(shí)例啟動(dòng)了binlog并且格式為ROW。
使用python對mysqlbinlog后的log文件進(jìn)行文本分析處理,在整個(gè)處理過程中,有以下6個(gè)疑難點(diǎn)需要處理:
判斷一個(gè)事務(wù)的開始跟結(jié)束
同一個(gè)事務(wù)的執(zhí)行順序需要反序執(zhí)行
解析回滾SQL
同一個(gè)事務(wù)操作不同表格處理
轉(zhuǎn)義字符處理,比如換行符、tab符等等
timestamp數(shù)據(jù)類型參數(shù)值轉(zhuǎn)換
負(fù)數(shù)處理
單個(gè)事務(wù)涉及到行修改SQL操作了 max_allow
針對某個(gè)表格做回滾,而不是全庫回滾
2.1 事務(wù)的開始與結(jié)束
按照Xid出現(xiàn)的位置來判斷,從binlog文件的最開始開始讀取,遇到SQL語句則提取出來,直到遇到Xid,統(tǒng)一把之前提取出來的SQL匯總為一個(gè)事務(wù),然后繼續(xù)提取SQL語句,直到遇到下一個(gè)Xid,再把這個(gè)事務(wù)的SQL匯總成一個(gè)事務(wù),一直這樣循環(huán),直至文件順序遍歷結(jié)束。
2.2 事務(wù)內(nèi)部反序處理
同一個(gè)事務(wù)中,如果有多個(gè)表格多行記錄發(fā)生變更,在回滾的時(shí)候,應(yīng)該反序回滾SQL,那么,如何將提取出來的SQL反序存儲呢?思路如下:
每行記錄的修改SQL獨(dú)立出來
將獨(dú)立出來的SQL反序存儲
假設(shè)正序的事務(wù)SQL語句存儲在變量 dml_sql 中,反序后的可以回滾的SQL存儲在變量 undo_sql中。按順序把行記錄修改的SQL抽取出來 存儲到變量 record_sql 中去,然后 賦值 undo_sql =record_sql + undo_sql ,再置空 record_sql 變量,如此,便可實(shí)現(xiàn)反序事務(wù)內(nèi)部的執(zhí)行SQL。
2.3 解析回滾SQL
首先,查看binlog的日志內(nèi)容,發(fā)現(xiàn)行修改的SQL情形如下,提取過程中需要注意這幾個(gè)問題:
行記錄的列名配對,binlog file存儲的列序號,不能直接使用
WHERE部分 跟 SET部分 之間并無關(guān)鍵字或者符號,需要添加 AND 或者 逗號
DELETE SQL 需要反轉(zhuǎn)為 INSERT
UPDATE SQL 需要把WHERE 跟 SET的部分進(jìn)行替換
INSERT SQL需要反轉(zhuǎn)為 DELETE
2.4 同事務(wù)不同表格處理
同一個(gè)事務(wù)中,允許對不同表格進(jìn)行數(shù)據(jù)修改,這點(diǎn)在列名替換列序號的時(shí)候,需要留意處理。
每一個(gè)的行記錄前有一行記錄,含有 'Table_map' 標(biāo)識,會說明這一行當(dāng)行記錄是修改哪個(gè)表格,可以根據(jù)這個(gè)提示,來替換binlog里邊的列序號為列名。
2.5 轉(zhuǎn)義字符處理
binlog文件在對非空格的空白字符處理,采用轉(zhuǎn)義字符字符串存儲,比如,在表格insert一列記錄含換行符,而實(shí)際上在binlog文件中,是使用了 \x0a 替換了 換行操作,所以在回滾數(shù)據(jù)的過程中,需要對轉(zhuǎn)義字符做處理。
這里注意一個(gè)地方,039的轉(zhuǎn)義字符是沒有在函數(shù) esc_code 中統(tǒng)一處理,而是單獨(dú)做另外處理。
轉(zhuǎn)移字符表相見下圖:
2.6 timestamp數(shù)據(jù)類型處理
timestamp實(shí)際在數(shù)據(jù)庫中的存儲值是 INT類型,需要使用 函數(shù) from_unixtime轉(zhuǎn)換。
建立測試表格tbtest,只有一列timestamp的列,存儲值后查看binlog的內(nèi)容,具體截圖如下:
在處理行記錄的時(shí)候,要對timestamp的value做處理,添加from_unixtime函數(shù)轉(zhuǎn)換。
2.7 負(fù)數(shù)值處理
這個(gè)一開始寫代碼的時(shí)候,并沒有考慮到。大量測試的過程中發(fā)現(xiàn),所有整型的數(shù)據(jù)類型,在存儲負(fù)數(shù)的時(shí)候,都會存入一個(gè)最大范圍值。binlog在處理這塊的機(jī)制有些不是很了解。測試如下:
所以當(dāng)遇到INT的各種數(shù)據(jù)類型并且VALUE為負(fù)數(shù)的時(shí)候,需要把 這個(gè)范圍值去除,才能執(zhí)行執(zhí)行undo_sql。
2.8 單個(gè)事務(wù)行記錄總SQL超過max_allowed_package處理
分析binlog后存儲兩種sql類型,一種是行記錄的修改SQL,即 dml_sql;一種是 行記錄的回滾sql,即 undo_sql。從代碼可知,存儲這兩個(gè)sql的列是longtext,最大可存儲4G的內(nèi)容。但是 MySQL中單個(gè)會話的包大小是有限制的,限制的參數(shù)為 max_allowed_packet,默認(rèn)大小為 4Mb,最大為1G,所以這個(gè)腳本使用前,請手動(dòng)設(shè)置 存儲binlog file的數(shù)據(jù)庫實(shí)例以及線上的數(shù)據(jù)庫實(shí)例這個(gè)參數(shù):
set global max_allowed_packet = 1073741824; #記得后續(xù)修改回來
萬一操作了呢?那么回滾只能分段來回滾,先回滾到 這個(gè)大事務(wù),然后單獨(dú)執(zhí)行這個(gè)大事務(wù),緊接著繼續(xù)回滾,這部分不能使用pymysql嗲用source 文件執(zhí)行,所以只能手動(dòng)做這個(gè)操作。 求高能人士修改這個(gè)邏輯代碼!。
2.9 針對性回滾
假設(shè)誤操作的沒有明確的時(shí)間點(diǎn),只有一個(gè)區(qū)間,而這個(gè)區(qū)間還有其他的表格操作,那么這個(gè)時(shí)候,需要在分析binlog文件的時(shí)候,添加--database選項(xiàng),先帥選到同一個(gè)數(shù)據(jù)庫中binlog文件中。
這里的處理是將這段區(qū)間的dml_sql跟undo_sql都存儲到數(shù)據(jù)庫表格中,然后再刪除不需要回滾的事務(wù),剩余需要回滾的事務(wù)。再執(zhí)行回滾操作。
3 使用說明
3.1 參數(shù)說明
這個(gè)腳本的參數(shù)稍微多些,可以 --help 查看具體說明。
本人喜歡用各種顏色來分類參數(shù)(blingbling五顏六色,看著多有趣多精神),所以,按顏色來說明這些參數(shù)。
黃色區(qū)域:這6個(gè)參數(shù),提供的是 分析并存儲binlog file的相關(guān)值,說明存儲分析結(jié)果的數(shù)據(jù)庫的鏈接方式、binlog文件的位置以及存儲結(jié)果的表格名字;
藍(lán)色區(qū)域:這4個(gè)參數(shù),提供 與線上數(shù)據(jù)庫表結(jié)構(gòu)一致的DB實(shí)例連接方式,僅需跟線上一模一樣的表結(jié)構(gòu),不一定需要是主從庫;
綠色區(qū)域:最最重要的選項(xiàng) -a,0代表僅分析binlog文件,1代表僅執(zhí)行回滾操作,必須先執(zhí)行0才可以執(zhí)行1;
紫色區(qū)域:舉例說明。
3.2 應(yīng)用場景說明
全庫回滾某段時(shí)間
需要回滾某個(gè)時(shí)間段的所有SQL操作,回滾到某一個(gè)時(shí)間點(diǎn)
這種情況下呢,大多數(shù)是使用備份文件+binlog解決
但是這個(gè)腳本也可以滿足,但請勿直接在線上操作,先 -a=0,看下分析結(jié)果,是否符合,符合的話,停掉某個(gè)從庫,再在從庫上執(zhí)行,最后開發(fā)業(yè)務(wù)接入檢查是否恢復(fù)到指定時(shí)間點(diǎn),數(shù)據(jù)是否正常。
某段時(shí)間某些表格回滾某些操作
回滾某個(gè)/些SQL
3.3 測試案例
3.3.1 全庫回滾某段時(shí)間
假設(shè)需要回滾9點(diǎn)10分到9點(diǎn)15分間數(shù)據(jù)庫的所有操作:
準(zhǔn)備測試環(huán)境實(shí)例存儲分析后的數(shù)據(jù)
測試環(huán)境修改set global max_allowed_packet = 1073741824
mysqlbinlog分析binlog文件
python腳本分析文件,action=0
線上測試環(huán)境修改set global max_allowed_packet = 1073741824
回滾數(shù)據(jù),action=1
線上測試環(huán)境修改set global max_allowed_packet = 4194304
1 --測試環(huán)境(請安裝pymysql):IP: 192.168.9.242,PORT:3310 ,數(shù)據(jù)庫:flashback,表格:tbevent 2 --具有線上表結(jié)構(gòu)的db:IP:192.168.9.243 PORT:3310 3 4 5 mysql> show global variables like 'max_allowed_packet'; 6 +--------------------+----------+ 7 Variable_name Value 8 +--------------------+----------+ 9 max_allowed_packet 16777216 10 +--------------------+----------+11 1 row in set (0.00 sec)12 13 mysql> set global max_allowed_packet = 1073741824;14 Query OK, 0 rows affected (0.00 sec)15 16 [root@sutest244 ~]# mysqlbinlog --start-datetime='2017-06-19 09:00:00' --stop-datetime='2017-06-19 10:00:00' --base64-output=decode-rows -v ~/data/mysql/data/mysql-bin.007335 > /tmp/binlog.log17 18 [root@sutest242 pycharm]# python su_flashback.py -h=127.0.0.1 -P=3310 -u=root -p=**** -f=/tmp/binlog.log -t=flashback.tbevent -oh=192.168.9.244 -oP=3310 -u=root -op=**** -a=019 2017-06-19 10:59:39,041 INFO begin to assign values to parameters20 2017-06-19 10:59:39,041 INFO assign values to parameters is done:host=127.0.0.1,user=root,password=***,port=3310,fpath=/tmp/binlog.log,tbevent=flashback.tbevent21 2017-06-19 10:59:39,049 INFO MySQL which userd to store binlog event connection is ok22 2017-06-19 10:59:39,050 INFO assign values to online mysql parameters is done:host=192.168.9.244,user=,password=***,port=331023 2017-06-19 10:59:39,054 INFO MySQL which userd to analyse online table schema connection is ok24 2017-06-19 10:59:39,054 INFO MySQL connection is ok25 2017-06-19 10:59:39,055 INFO creating table flashback.tbevent to store binlog event26 2017-06-19 10:59:39,058 INFO created table flashback.tbevent
27 2017-06-19 10:59:39,060 INFO begining to analyze the binlog file ,this may be take a long time !!!28 2017-06-19 10:59:39,061 INFO analyzing...29 2017-06-19 11:49:53,781 INFO finished to analyze the binlog file !!!30 2017-06-19 11:49:53,782 INFO release all db connections31 2017-06-19 11:49:53,782 INFO All done,check the flashback.tbevent which stored binlog event on host 127.0.0.1 , port 3310 32 33 34 [root@sutest242 pycharm]# python su_flashback.py -h=127.0.0.1 -P=3310 -u=root -p=**** -f=/tmp/binlog.log -t=flashback.tbevent -oh=192.168.9.244 -oP=3310 -u=root -op=**** -a=135 2017-06-19 16:30:20,633 INFO begin to assign values to parameters36 2017-06-19 16:30:20,635 INFO assign values to parameters is done:host=127.0.0.1,user=root,password=***,port=3310,fpath=/tmp/binlog.log,tbevent=flashback.tbevent37 2017-06-19 16:30:20,865 INFO MySQL which userd to store binlog event connection is ok38 2017-06-19 16:30:20,866 INFO assign values to online mysql parameters is done:host=192.168.9.244,user=,password=***,port=331039 2017-06-19 16:30:20,871 INFO MySQL which userd to analyse online table schema connection is ok40 2017-06-19 16:30:20,871 INFO MySQL connection is ok41 2017-06-19 16:30:21,243 INFO There has 347868 transactions ,need 35 batchs ,each batche doing 10000 transactions
42 2017-06-19 16:30:21,243 INFO doing batch : 1 43 2017-06-19 16:31:01,182 INFO doing batch : 2 44 2017-06-19 16:31:16,909 INFO doing batch : 3 45 -------省空間忽略不截圖--------------46 2017-06-19 16:41:11,287 INFO doing batch : 34 47 2017-06-19 16:41:25,577 INFO doing batch : 35 48 2017-06-19 16:41:44,629 INFO release all db connections49 2017-06-19 16:41:44,630 INFO All done,check the flashback.tbevent which stored binlog event on host 127.0.0.1 , port 3310
3.3.2 某段時(shí)間某些表格回滾某些操作
準(zhǔn)備測試環(huán)境實(shí)例存儲分析后的數(shù)據(jù)
測試環(huán)境修改set global max_allowed_packet = 1073741824
mysqlbinlog分析binlog文件
python腳本分析文件,action=0
分析帥選需要的事務(wù),rename表格
dump 對應(yīng)的表格到測試環(huán)境
回滾數(shù)據(jù),action=1
提交給開發(fā)業(yè)務(wù)對比數(shù)據(jù)
3.3.3 回滾某個(gè)/些SQL
準(zhǔn)備測試環(huán)境實(shí)例存儲分析后的數(shù)據(jù)
測試環(huán)境修改set global max_allowed_packet = 1073741824
mysqlbinlog分析binlog文件
python腳本分析文件,action=0
分析帥選需要的事務(wù),rename表格
dump 對應(yīng)的表格到測試環(huán)境
回滾數(shù)據(jù),action=1
提交給開發(fā)業(yè)務(wù)對比數(shù)據(jù)
4 python腳本
腳本會不定期修復(fù)bug,若是感興趣,可以往github下載: 中的 mysql_xinysu_flashback 。
1 # -*- coding: utf-8 -*- 2 __author__ = 'xinysu' 3 __date__ = '2017/6/15 10:30' 4 5 6 7 import re 8 import os 9 import sys 10 import datetime 11 import time 12 import logging 13 import importlib 14 importlib.reload(logging) 15 logging.basicConfig(level=logging.DEBUG,format='%(asctime)s %(levelname)s %(message)s ') 16 17 import pymysql 18 from pymysql.cursors import DictCursor 19 20 usage='''\nusage: python [script's path] [option] 21 ALL options need to assign: 22 \033[1;33;40m 23 -h : host, the database host,which database will store the results after analysis 24 -u : user, the db user 25 -p : password, the db user's password 26 -P : port, the db port 27 28 -f : file path, the binlog file 29 -t : table name, the table name to store the results after analysis , {dbname}.{tbname}, 30 when you want to store in `test` db and the table name is `tbevent`,then this parameter
31 is test.tbevent 32 \033[1;34;40m 33 -oh : online host, the database host,which database have the online table schema 34 -ou : online user, the db user 35 -op : online password, the db user's password 36 -oP : online port, the db port 37 \033[1;32;40m 38 -a : action,
39 0 just analyse the binlog file ,and store sql in table;
40 1 after execute self.dotype=0, execute the undo_sql in the table 41 \033[0m
42 --help: help document 43 \033[1;35;40m 44 Example: 45 analysize binlog: 46 python su_flashback.py -h=127.0.0.1 -P=3310 -u=root -p=*** -f=/tmp/binlog.log -t=flashback.tbevent
47 -oh=192.168.9.244 -oP=3310 -u=root -op=***
48 -a=0 49 50 flash back: 51 python su_flashback.py -h=127.0.0.1 -P=3310 -u=root -p=*** -f=/tmp/binlog.log -t=flashback.tbevent
52 -oh=192.168.9.244 -oP=3310 -u=root -op=***
53 -a=1 54 \033[0m
55 ''' 56 57 class flashback: 58 def __init__(self): 59 self.host='' 60 self.user='' 61 self.password='' 62 self.port='3306' 63 self.fpath='' 64 self.tbevent='' 65 66 self.on_host='' 67 self.on_user='' 68 self.on_password='' 69 self.on_port='3306' 70 71 self.action=0 # 0 just analyse the binlog file ,and store sql in table;1 after execute self.dotype=0, execute the undo_sql in the table 72 73 self._get_db() # 從輸入?yún)?shù)獲取連接數(shù)據(jù)庫的相關(guān)參數(shù)值 74 75 # 連接數(shù)據(jù)庫,該數(shù)據(jù)庫是用來存儲binlog文件分析后的內(nèi)容 76 logging.info('assign values to parameters is done:host={},user={},password=***,port={},fpath={},tbevent={}'.format(self.host,self.user,self.port,self.fpath,self.tbevent)) 77 self.mysqlconn = pymysql.connect(host=self.host, user=self.user, password=self.password, port=self.port,charset='utf8') 78 self.cur = self.mysqlconn.cursor(cursor=DictCursor) 79 logging.info('MySQL which userd to store binlog event connection is ok') 80 81 # 連接數(shù)據(jù)庫,該數(shù)據(jù)庫的表結(jié)構(gòu)必須跟binlogfile基于對數(shù)據(jù)庫表結(jié)構(gòu)一致 82 # 該數(shù)據(jù)庫用于提供 binlog file 文件中涉及到表結(jié)構(gòu)分析 83 logging.info('assign values to online mysql parameters is done:host={},user={},password=***,port={}'.format(self.on_host, self.on_user, self.on_port)) 84 self.on_mysqlconn = pymysql.connect(host=self.on_host, user=self.on_user, password=self.on_password, port=self.on_port,charset='utf8') 85 self.on_cur = self.on_mysqlconn.cursor(cursor=DictCursor) 86 logging.info('MySQL which userd to analyse online table schema connection is ok') 87 88 logging.info('\033[33mMySQL connection is ok\033[0m') 89 90 self.dml_sql='' 91 self.undo_sql='' 92 93 self.tbfield_where = [] 94 self.tbfield_set = [] 95 96 self.begin_time='' 97 self.db_name='' 98 self.tb_name='' 99 self.end_time=''100 self.end_pos=''101 self.sqltype=0102 103 #_get_db用于獲取執(zhí)行命令的輸入?yún)?shù)104 def _get_db(self):105 logging.info('begin to assign values to parameters')106 if len(sys.argv) == 1:107 print(usage)108 sys.exit(1)109 elif sys.argv[1] == '--help':110 print(usage)111 sys.exit()112 elif len(sys.argv) > 2:113 for i in sys.argv[1:]:114 _argv = i.split('=')115 if _argv[0] == '-h':116 self.host = _argv[1]117 elif _argv[0] == '-u':118 self.user = _argv[1]119 elif _argv[0] == '-P':120 self.port = int(_argv[1])121 elif _argv[0] == '-f':122 self.fpath = _argv[1]123 elif _argv[0] == '-t':124 self.tbevent = _argv[1]125 elif _argv[0] == '-p':126 self.password = _argv[1]127 128 elif _argv[0] == '-oh':129 self.on_host = _argv[1]130 elif _argv[0] == '-ou':131 self.on_user = _argv[1]132 elif _argv[0] == '-oP':133 self.on_port = int(_argv[1])134 elif _argv[0] == '-op':135 self.on_password = _argv[1]136 137 elif _argv[0] == '-a':138 self.action = _argv[1]139 140 else:141 print(usage)142 143 #創(chuàng)建表格,用于存儲分析后的BINLOG內(nèi)容144 def create_tab(self):145 logging.info('creating table {} to store binlog event'.format(self.tbevent))146 create_tb_sql ='''147 CREATE TABLE IF NOT EXISTS {}(148 auto_id INT(10) UNSIGNED NOT NULL AUTO_INCREMENT,149 binlog_name VARCHAR(100) NOT NULL COMMENT 'the binlog file path and name',150 dml_start_time DATETIME NOT NULL COMMENT 'when to start this transaction ',151 dml_end_time DATETIME NOT NULL COMMENT 'when to finish this transaction ',152 end_log_pos BIGINT NOT NULL COMMENT 'the log position for finish this transaction',153 db_name VARCHAR(100) NOT NULL COMMENT 'which database happened this transaction ',154 table_name VARCHAR(200) NOT NULL COMMENT 'which table happened this transaction ',155 sqltype INT NOT NULL COMMENT '1 is insert,2 is update,3 is delete',156 dml_sql LONGTEXT NULL COMMENT 'what sql excuted',157 undo_sql LONGTEXT NULL COMMENT 'rollback sql, this sql used for flashback',158 PRIMARY KEY (auto_id),159 INDEX sqltype(sqltype),160 INDEX dml_start_time (dml_start_time),161 INDEX dml_end_time (dml_end_time),162 INDEX end_log_pos (end_log_pos),163 INDEX db_name (db_name),164 INDEX table_name (table_name)165 )166 COLLATE='utf8_general_ci' ENGINE=InnoDB;167 TRUNCATE TABLE {};168 169 '''.format(self.tbevent,self.tbevent)170 self.cur.execute(create_tb_sql)171 logging.info('created table {} '.format(self.tbevent))172 173 #獲取表格的列順序?qū)?yīng)的列名,并處理where set的時(shí)候,列與列之間的連接字符串是逗號還是 and174 def tbschema(self,dbname,tbname):175 self.tbfield_where = []176 self.tbfield_set = []177 178 sql_tb='desc {}.{}'.format(self.db_name,self.tb_name)179 180 self.on_cur.execute(sql_tb)181 tbcol=self.on_cur.fetchall()182 183 i = 0184 for l in tbcol:185 #self.tbfield.append(l['Field'])186 if i==0:187 self.tbfield_where.append('`'+l['Field']+'`')188 self.tbfield_set.append('`'+l['Field']+'`')189 i+=1190 else:191 self.tbfield_where.append('/*where*/ and /*where*/' + '`'+l['Field']+'`')192 self.tbfield_set.append( '/*set*/ , /*set*/'+'`'+l['Field']+'`' )193 194 # 一個(gè)事務(wù)記錄一行,若binlog file中的行記錄包含 Table_map,則為事務(wù)的開始記錄195 def rowrecord(self,bl_line):196 try:197 if bl_line.find('Table_map:') != -1:198 l = bl_line.index('server')199 m = bl_line.index('end_log_pos')200 n = bl_line.index('Table_map')201 begin_time = bl_line[:l:].rstrip(' ').replace('#', '20')202 203 self.begin_time = begin_time[0:4] + '-' + begin_time[4:6] + '-' + begin_time[6:]204 self.db_name = bl_line[n::].split(' ')[1].replace('`', '').split('.')[0]205 self.tb_name = bl_line[n::].split(' ')[1].replace('`', '').split('.')[1]206 207 self.tbschema(self.db_name,self.tb_name)208 except Exception:209 return 'funtion rowrecord error'210 211 def dml_tran(self,bl_line):212 try:213 214 215 if bl_line.find('Xid =') != -1:216 217 l = bl_line.index('server')218 m = bl_line.index('end_log_pos')219 end_time = bl_line[:l:].rstrip(' ').replace('#', '20')220 self.end_time = end_time[0:4] + '-' + end_time[4:6] + '-' + end_time[6:]221 self.end_pos = int(bl_line[m::].split(' ')[1])222 223 224 225 self.undo_sql = self.dml_sql.replace(' INSERT INTO', ';DELETE FROM_su').replace(' UPDATE ',';UPDATE').replace(' DELETE FROM', ';INSERT INTO').replace(';DELETE FROM_su', ';DELETE FROM').replace('WHERE', 'WHERE_marksu').replace('SET', 'WHERE').replace('WHERE_marksu', 'SET').replace('/*set*/ , /*set*/', ' and ').replace('/*where*/ and /*where*/',' , ')226 self.dml_sql=self.dml_sql.replace('/*set*/ , /*set*/', ' , ').replace('/*where*/ and /*where*/',' and ')227 228 if self.dml_sql.startswith(' INSERT INTO '):229 self.sqltype=1230 elif self.dml_sql.startswith(' UPDATE '):231 self.sqltype=2232 elif self.dml_sql.startswith(' DELETE '):233 self.sqltype=3234 235 record_sql = ''236 undosql_desc = ''237 238 #同個(gè)事務(wù)內(nèi)部的行記錄修改SQL,反序存儲239 for l in self.undo_sql.splitlines():240 if l.startswith(' ;UPDATE') or l.startswith(' ;INSERT') or l.startswith(' ;DELETE'):241 undosql_desc = record_sql + undosql_desc242 record_sql = ''243 record_sql = record_sql + l244 else:245 record_sql = record_sql + l246 247 self.undo_sql = record_sql + undosql_desc248 self.undo_sql = self.undo_sql.lstrip()[1:]+';'249 250 #處理非空格的空白特殊字符251 self.dml_sql = self.esc_code(self.dml_sql)252 self.undo_sql = self.esc_code(self.undo_sql)253 254 #單獨(dú)處理 轉(zhuǎn)移字符: \'255 self.dml_sql = self.dml_sql.replace("'", "''").replace('\\x27',"''''") # + ';'256 self.undo_sql = self.undo_sql.replace("'", "''").replace('\\x27',"''''") # + ';'257 258 if len(self.dml_sql)>500000000:259 with open('/tmp/flashback_undosql/'+str(self.end_pos)+'.sql', 'w') as w_f:260 w_f.write('begin;' + '\n')261 w_f.write(self.undo_sql)262 w_f.write('commit;' + '\n')263 self.dml_sql=''264 self.undo_sql='/tmp/flashback_undosql/'+str(self.end_pos)+'.sql'265 logging.info("the size of this transaction is more than 500Mb ,the file location : {}".format(self.undo_file))266 267 insert_sql = "INSERT INTO {}(binlog_name,dml_start_time,dml_end_time,end_log_pos,db_name,table_name,sqltype,dml_sql,undo_sql) select '{}','{}','{}','{}','{}','{}',{},'{}','{}'".format(268 self.tbevent, self.fpath, self.begin_time, self.end_time, self.end_pos,269 self.db_name, self.tb_name, self.sqltype, self.dml_sql, self.undo_sql)270 271 self.cur.execute(insert_sql)272 self.mysqlconn.commit()273 274 self.dml_sql = ''275 self.undo_sql = ''276 except Exception:277 print( 'funtion dml_tran error')278 279 280 def analyse_binlog(self):281 try:282 sqlcomma=0283 self.create_tab()284 285 with open(self.fpath,'r') as binlog_file:286 logging.info('\033[36mbegining to analyze the binlog file ,this may be take a long time !!!\033[0m')287 logging.info('\033[36manalyzing...\033[0m')288 for bline in binlog_file:289 if bline.find('Table_map:') != -1:290 self.rowrecord(bline)291 bline=''292 elif bline.rstrip()=='### SET':293 bline = bline[3:]294 sqlcomma=1295 elif bline.rstrip()=='### WHERE':296 bline = bline[3:]297 sqlcomma = 2298 elif bline.startswith('### @'):299 len_f=len('### @')300 i=bline[len_f:].split('=')[0]301 302 #處理timestamp類型303 if bline[8+len(i):].split(' ')[2] == 'TIMESTAMP(0)':304 stop_pos = bline.find(' /* TIMESTAMP(0) meta=')305 bline = bline.split('=')[0] + '=from_unixtime(' + bline[:stop_pos].split('=')[1] + ')'306 307 #處理負(fù)數(shù)存儲方式308 if bline.split('=')[1].startswith('-'):309 stop_pos = bline.find(' /* TIMESTAMP(0) meta=')310 bline = bline.split('=')[0] + '=' + bline.split('=')[1].split(' ')[0]+'\n'311 312 if sqlcomma==1:313 bline = self.tbfield_set[int(i) - 1]+bline[(len_f+len(i)):]314 elif sqlcomma==2:315 bline = self.tbfield_where[int(i) - 1] + bline[(len_f+len(i)):]316 317 elif bline.startswith('### DELETE') or bline.startswith('### INSERT') or bline.startswith('### UPDATE'):318 bline = bline[3:]319 320 elif bline.find('Xid =') != -1:321 self.dml_tran(bline)322 bline=''323 else:324 bline = ''325 326 if bline.rstrip('\n') != '':327 self.dml_sql = self.dml_sql + bline + ' '328 except Exception:329 return 'function do error'330 331 def esc_code(self,sql):332 esc={333 '\\x07':'\a','\\x08':'\b','\\x0c':'\f','\\x0a':'\n','\\x0d':'\r','\\x09':'\t','\\x0b':'\v','\\x5c':'\\',334 #'\\x27':'\'',335 '\\x22':'\"','\\x3f':'\?','\\x00':'\0'336 }337 338 for k,v in esc.items():339 sql=sql.replace(k,v)340 return sql341 342 def binlogdesc(self):343 344 countsql='select sqltype , count(*) numbers from {} group by sqltype order by sqltype '.format(self.tbevent)345 print(countsql)346 self.cur.execute(countsql)347 count_row=self.cur.fetchall()348 349 update_count=0350 insert_couont=0351 delete_count=0352 for row in count_row:353 if row['sqltype']==1:354 insert_couont=row['numbers']355 elif row['sqltype']==2:356 update_count=row['numbers']357 elif row['sqltype']==3:358 delete_count=row['numbers']359 logging.info('\033[1;35mTotal transactions number is {}: {} inserts, {} updates, {} deletes !\033[0m(all number is accurate, the other is approximate value) \033[0m'.format(insert_couont+update_count+delete_count,insert_couont,update_count,delete_count))360 361 def undosql(self,number):362 #這里會有幾個(gè)問題:363 #1 如果一共有幾十萬甚至更多的事務(wù)操作,那么這個(gè)python腳本,極為占用內(nèi)存,有可能執(zhí)行錯(cuò)誤;364 #2 如果單個(gè)事務(wù)中,涉及修改的行數(shù)高達(dá)幾十萬行,其binlog file 達(dá)好幾G,這里也會有內(nèi)存損耗問題;365 #所以,針對第一點(diǎn),這里考慮對超多事務(wù)進(jìn)行一個(gè)分批執(zhí)行處理,每個(gè)批次處理number個(gè)事務(wù),避免一次性把所有事務(wù)放到python中;但是第2點(diǎn),目前暫未處理366 367 tran_num=1368 id=0369 370 tran_num_sql="select count(*) table_rows from {}".format(self.tbevent)371 372 self.cur.execute(tran_num_sql)373 tran_rows=self.cur.fetchall()374 375 for num in tran_rows:376 tran_num=num['table_rows']377 378 logging.info('\033[32mThere has {} transactions ,need {} batchs ,each batche doing {} transactions \033[0m'.format(tran_num,int(tran_num/number)+1,number))379 380 while id<=tran_num:381 logging.info('doing batch : {} '.format(int(id/number)+1))382 undo_sql='select auto_id,undo_sql from {} where auto_id > {} and auto_id <= {} order by auto_id desc;'.format(self.tbevent,tran_num-(id+number),tran_num-id)383 self.cur.execute(undo_sql)384 385 undo_rows=self.cur.fetchall()386 f_sql=''387 388 for u_row in undo_rows:389 try:390 self.on_cur.execute(u_row['undo_sql'])391 self.on_mysqlconn.commit()392 except Exception:393 print('auto_id:',u_row['auto_id'])394 id+=number395 396 397 def undo_file(self,number):398 # 也可以選擇私用undo_file將undo_sql導(dǎo)入到文件中,然后再source399 400 tran_num=1401 id=0402 403 tran_num_sql="select count(*) table_rows from {}".format(self.tbevent)404 405 self.cur.execute(tran_num_sql)406 tran_rows=self.cur.fetchall()407 408 for num in tran_rows:409 tran_num=num['table_rows']410 411 logging.info('copy undo_sql to undo file on : /tmp/flashback_undosql/undo_file_flashback.sql')412 logging.info('\033[32mThere has {} transactions ,need {} batchs to copy ,each batche doing {} transactions \033[0m'.format(tran_num,int(tran_num/number)+1,number))413 414 with open('/tmp/flashback_undosql/undo_file_flashback.sql', 'w') as w_f:415 while id<=tran_num:416 logging.info('doing batch : {} '.format(int(id/number)+1))417 undo_sql='select auto_id,undo_sql from {} where auto_id > {} and auto_id <= {} order by auto_id desc;'.format(self.tbevent,tran_num-(id+number),tran_num-id)418 self.cur.execute(undo_sql)419 420 undo_rows=self.cur.fetchall()421 for u_row in undo_rows:422 try:423 w_f.write('begin;' + '\n')424 w_f.write('# auto_id'+str(u_row['auto_id']) + '\n')425 w_f.write(u_row['undo_sql'] + '\n')426 w_f.write('commit;' + '\n')427 except Exception:428 print('auto_id',u_row['auto_id'])429 #time.sleep(2)430 id+=number431 432 def do(self):433 if self.action=='0':434 self.analyse_binlog()435 logging.info('\033[36mfinished to analyze the binlog file !!!\033[0m')436 #self.binlogdesc()437 elif self.action=='1':438 self.undosql(10000)439 440 def closeconn(self):441 self.cur.close()442 self.on_cur.close()443 logging.info('release all db connections')444 logging.info('\033[33mAll done,check the {} which stored binlog event on host {} , port {} \033[0m'.format(self.tbevent,self.host,self.port))445 446 def main():447 p = flashback()448 p.do()449 p.closeconn()450 451 if __name__ == "__main__":452 main()
以上就是mysql基于binlog回滾工具實(shí)例詳解的詳細(xì)內(nèi)容,更多請關(guān)注php中文網(wǎng)其它相關(guān)文章!
學(xué)習(xí)教程快速掌握從入門到精通的SQL知識。