Python解析mysql的binlog的脚本

线上的业务随着时间的推移,数据可能会越来越多,再加上我们是多个业务共用同一个服务器/数据库,可能上线新的业务会对老的业务出现关联影响。因此我写了一个脚本去解析MySQL的binlog日志,用于在出现问题的时候分析SQL,给我们生成统计报表。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
#-*-coding:utf-8-*-
import os,sys,time,subprocess,re
help="""
\033[1;31;40m******************************************************************************************************
分析mysql的binlog,统计出并发量最大的时刻,以及并发数最大的sql是什么
\033[1;32;40m脚本没有考虑性能,包含了sort操作,线上环境慎用\033[1;31;40m

执行示例:
python stat.py /data/mysql/data/mysql-bin.002860 '2017-11-16 10:12:00' '2017-11-16 10:43:00'
第一个参数为binlog文件路径
第二个参数为统计的起点时间
第三个参数为统计的截止时间

生成的文件:
YYYY-MM-DD_HH:ii:ss.log : 解析后可以人为识别的sql日志
YYYY-MM-DD_HH:ii:ss.log_min : 精简的sql日志,只保留了前N列数据
YYYY-MM-DD_HH:ii:ss.log_format : 格式化后的time -> sql形式的数据
YYYY-MM-DD_HH:ii:ss.log_concurrency : 并发数统计
YYYY-MM-DD_HH:ii:ss.log_sql : 并发量最大的时刻的sql操作统计
******************************************************************************************************\033[0m
"""
def console(msg):
print("\033[1;31;40m%s\033[0m" % msg)

if len(sys.argv) >= 2 and sys.argv[1] == '-h':
print(help)
exit(0)

if len(sys.argv) < 4 :
print('Need at least 3 parameters')
exit(0)
bin_log_file=sys.argv[1]
start_time=sys.argv[2]
end_time=sys.argv[3]

#需要过滤掉一些sql中包含的日期
start_date=time.strftime("%Y%m%d", time.strptime(start_time, "%Y-%m-%d %H:%M:%S"))
mini_start_date=start_date.lstrip('20')
end_date=time.strftime("%Y%m%d", time.strptime(end_time, "%Y-%m-%d %H:%M:%S"))
mini_end_date=end_date.lstrip("20")
reg_e_date="%s|%s" % (mini_start_date, mini_end_date)
reg_ev_date="%s|%s|=%s|=%s" % (start_date, end_date, mini_start_date, mini_end_date)
if start_date == end_date :
reg_e_date=mini_start_date
reg_ev_date="%s|=%s" % (start_date, mini_start_date)


parsed_bin_file="/data/%s.log" % end_time.replace(' ', '_')
mini_parsed_bin_file="%s_min" % parsed_bin_file
format_parsed_bin_file="%s_format" % parsed_bin_file
stat_concurrency_file="%s_concurrency" % parsed_bin_file
concurrency_sql_file="%s_sql" % parsed_bin_file


#将binlog解析为可以识别的明文,并写入文件中以备后用
parse_bin_log_cmd=["/usr/local/mysql/bin/mysqlbinlog", "--base64-output=decode-rows", "-v", "--start-datetime=%s" % start_time, "--stop-datetime=%s" % end_time, bin_log_file]
ret=subprocess.Popen(parse_bin_log_cmd, stdout=subprocess.PIPE).communicate()
f=open(parsed_bin_file, 'w')
f.write(ret[0])
f.close()


#过滤文件,仅保留时间、sql信息
p1=subprocess.Popen(["cat", parsed_bin_file], stdout=subprocess.PIPE)
p2=subprocess.Popen(["grep", "-E", "%s|INSERT|UPDATE|DELETE|REPLACE" % reg_e_date], stdin=p1.stdout, stdout=subprocess.PIPE)
console("grep -E %s|INSERT|UPDATE|DELETE|REPLACE" % reg_e_date)
#这里有时候会出现@1=171116这样的字符,需要过滤掉
p3=subprocess.Popen(["grep", "-Ev", "INSERT_ID|%s" % (reg_ev_date)], stdin=p2.stdout, stdout=subprocess.PIPE)
console("grep -Ev INSERT_ID|%s" % (reg_ev_date))
#Notice:awk的命令,这里不能用单引号,直接省略掉外围的单引号,然后内部双引号转义处理下
ret=subprocess.Popen(["/usr/bin/awk", "{print $1\" \"$2\" \"$3\" \"$4\" \"$5}"], stdin=p3.stdout, stdout=subprocess.PIPE).communicate()
console("awk {print $1\" \"$2\" \"$3\" \"$4\" \"$5}")
f=open(mini_parsed_bin_file, 'w')
console("write into %s" % mini_parsed_bin_file)
f.write(ret[0])
f.close()


#生成时间->SQL文件,便于后面统计并发量、定位并发数最高的SQL
f=open(mini_parsed_bin_file, 'r')
#先清空文件
subprocess.Popen("echo '' > %s" % format_parsed_bin_file, shell=True)
format_file=open(format_parsed_bin_file, 'a')
console("write into %s" % format_parsed_bin_file)
line=f.readline()
operate_time=''
sql=''
while line :
if re.search(reg_e_date, line):
t_info=line.split()
if len(t_info) < 2:
print(t_info)
continue
operate_time=t_info[1]
else :
sql=line
#print("%s -> %s" % (operate_time, sql))
format_file.write("%s -> %s" % (operate_time, sql))
line=f.readline()
f.close()


#统计并发量
p1=subprocess.Popen(["cat", format_parsed_bin_file], stdout=subprocess.PIPE)
p2=subprocess.Popen(["/usr/bin/awk","{print $1}"], stdin=p1.stdout, stdout=subprocess.PIPE)
p3=subprocess.Popen(["uniq", "-c"], stdin=p2.stdout, stdout=subprocess.PIPE)
p4=subprocess.Popen(["sort", "-nrk1"], stdin=p3.stdout, stdout=subprocess.PIPE)
ret=p4.communicate()
f=open(stat_concurrency_file, 'w')
console("write into %s" % stat_concurrency_file)
f.write(ret[0])
f.close()


#获取并发数最高的时间点
p1=subprocess.Popen(["head", "-n", "1", stat_concurrency_file], stdout=subprocess.PIPE)
ret=subprocess.Popen(["/usr/bin/awk","{print $2}"], stdin=p1.stdout, stdout=subprocess.PIPE).communicate()
max_concurrency_time=ret[0].strip()
print("\033[1;31;40mmax concurrency time between %s and %s is \033[1;32;40m%s\033[0m" % (start_time, end_time, max_concurrency_time))


#查询并发数最高的时刻,执行的sql有哪些
p1=subprocess.Popen(["cat", format_parsed_bin_file], stdout=subprocess.PIPE)
p2=subprocess.Popen(["grep", max_concurrency_time], stdin=p1.stdout, stdout=subprocess.PIPE)
p3=subprocess.Popen(["/usr/bin/awk", "-F", "->", "{print $2}"], stdin=p2.stdout, stdout=subprocess.PIPE)
p4=subprocess.Popen(["sort"], stdin=p3.stdout, stdout=subprocess.PIPE)
p5=subprocess.Popen(["uniq", "-c"], stdin=p4.stdout, stdout=subprocess.PIPE)
p6=subprocess.Popen(["sort", "-nrk1"], stdin=p5.stdout, stdout=subprocess.PIPE)
ret=p6.communicate()
f=open(concurrency_sql_file, 'w')
console("write into %s" % concurrency_sql_file)
f.write(ret[0])
f.close()