python并发测试脚本

释放双眼,带上耳机,听听看~!

这两天要做性能测试,自己没事用python写了个脚本,用于压力测试

 

Python代码  

# -*- coding: utf8 -*-  
1.
# code by Shurrik  
1.
import threading, time, httplib  
1.
HOST = 
"www.baidu.com"; 
#主机地址 例如192.168.1.101  
1.
PORT = 
80 
#端口  
1.
URI = 
"/?123" 
#相对地址,加参数防止缓存,否则可能会返回304  
1.
TOTAL = 

#总数  
1.
SUCC = 

#响应成功数  
1.
FAIL = 

#响应失败数  
1.
EXCEPT = 

#响应异常数  
1.
MAXTIME=

#最大响应时间  
1.
MINTIME=
100 
#最小响应时间,初始值为100秒  
1.
GT3=

#统计3秒内响应的  
1.
LT3=

#统计大于3秒响应的  
1.
# 创建一个 threading.Thread 的派生类  
1.
class RequestThread(threading.Thread):  
1.
    
# 构造函数  
1.
    
def init(
self, thread_name):  
1.
        threading.Thread.init(
self)  
1.
        
self.test_count = 
0  
1.
  
1.
    
# 线程运行的入口函数  
1.
    
def run(
self):  
1.
  
1.
        
self.test_performace()  
1.
  
1.
  
1.
    
def test_performace(
self):  
1.
            
global TOTAL  
1.
            
global SUCC  
1.
            
global FAIL  
1.
            
global EXCEPT  
1.
            
global GT3  
1.
            
global LT3  
1.
            
try:  
1.
                st = time.time()  
1.
                conn = httplib.HTTPConnection(HOST, PORT, 
False)    
1.
                conn.request(
'GET', URI)  
1.
                res = conn.getresponse()    
1.
                
#print 'version:', res.version    
1.
                
#print 'reason:', res.reason    
1.
                
#print 'status:', res.status    
1.
                
#print 'msg:', res.msg    
1.
                
#print 'headers:', res.getheaders()  
1.
                start_time  
1.
                
if res.status == 
200:  
1.
                    TOTAL+=
1  
1.
                    SUCC+=
1  
1.
                
else:  
1.
                    TOTAL+=
1  
1.
                    FAIL+=
1  
1.
                time_span = time.time()-st  
1.
                
print 
'%s:%f\n'%(
self.name,time_span)  
1.
                
self.maxtime(time_span)  
1.
                
self.mintime(time_span)  
1.
                
if time_span>
3:  
1.
                    GT3+=
1  
1.
                
else:  
1.
                    LT3+=
1                      
1.
            
except Exception,e:  
1.
                
print e  
1.
                TOTAL+=
1  
1.
                EXCEPT+=
1  
1.
            conn.close()  
1.
    
def maxtime(
self,ts):  
1.
            
global MAXTIME  
1.
            
print ts  
1.
            
if ts>MAXTIME:  
1.
                MAXTIME=ts  
1.
    
def mintime(
self,ts):  
1.
            
global MINTIME  
1.
            
if ts<MINTIME:  
1.
                MINTIME=ts  
1.
          
1.
# main 代码开始  
1.
print 
'===========task start==========='  
1.
# 开始的时间  
1.
start_time = time.time()  
1.
# 并发的线程数  
1.
thread_count = 
300  
1.
  
1.
i = 
0  
1.
while i <= thread_count:  
1.
    t = RequestThread(
"thread" + str(i))  
1.
    t.start()  
1.
    i += 
1  
1.
t=
0  
1.
#并发数所有都完成或大于50秒就结束  
1.
while TOTAL<thread_count|t>
50:  
1.
        
print 
"total:%d,succ:%d,fail:%d,except:%d\n"%(TOTAL,SUCC,FAIL,EXCEPT)  
1.
        
print HOST,URI  
1.
        t+=
1  
1.
        time.sleep(
1)  
1.
print 
'===========task end==========='  
1.
print 
"total:%d,succ:%d,fail:%d,except:%d"%(TOTAL,SUCC,FAIL,EXCEPT)  
1.
print 
'response maxtime:',MAXTIME  
1.
print 
'response mintime',MINTIME  
1.
print 
'great than 3 seconds:%d,percent:%0.2f'%(GT3,float(GT3)/TOTAL)  
1.
print 
'less than 3 seconds:%d,percent:%0.2f'%(LT3,float(LT3)/TOTAL)  

 300并发试着测了iteye几次,还是被封了一次IP……

给TA打赏
共{{data.count}}人
人已打赏
安全技术

C++ 中 struct和class 的区别

2022-1-11 12:36:11

安全技术

Core Java (十一) Java 继承,类,超类和子类

2022-1-11 12:36:11

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索