1
2
3
4
5
最近在学习 python 多线程,写了一个多线程目录扫描脚本练练手
目前实现功能:
指定线程
指定字典
添加返回的状态码检测(默认200)

2019.5.18(初次完成)

源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
import argparse
import threading
from queue import Queue
import requests
import sys
import time

# 用户参数获取
def GetValue():
parser = argparse.ArgumentParser(description='By wEik1')
parser.add_argument("-u","--url",dest="url",help="URL")
parser.add_argument("-p","--dicf",dest="dicf",help="Dictionary",default=sys.path[0]+'/dictionary')
parser.add_argument("-t","--threads",type=int,dest="thread_count",help="Threads Count",default=10)
parser.add_argument("-s","--status_code",type=int,dest="status_code",help="status_code,Automatically added after setting (keep default),default=>['200']",default=None)
parser.add_argument("-m","-mode",dest="mode",help="mode[q=>fast(default),n=>normal]",default='q')
args = parser.parse_args()
#用户输入处理
Value = {}
Value['url'] = str(args.url)
Value['dicf'] = args.dicf
Value['thread_count'] = args.thread_count
Value['status_code'] = ['200']
if not args.status_code == None:
Value['status_code'].append(args.status_code)
else:
Value['status_code'] = [200]
Value['mode']= args.mode

return Value

# 将字典与url 组合 并存入队列
def GetDir(url='http://127.0.0.1',dicf='D:\Python\多线程扫描器/dictionary'):
queue = Queue() #创建队列储存字典

for line in open(dicf,'r'): #读取字典存入队列
line = line.strip(' ')
line = line.strip('\n')
url = url.strip(' ')
url = url.strip('/')
queue.put(url+line)
# while not queue.empty():
# print(queue.get())
return queue
#定义线程
class ScanDir(threading.Thread): #继承Thread类
def __init__(self,queue,status_code,mode):
threading.Thread.__init__(self)
self.queue = queue
self.status_code = status_code
self.mode = mode
#这一步不知道是干嘛的。应该是用这种方法是定义线程必须的()
#创建一个线程对象,只要继承类threading.Thread,然后在__ init__里边调用threading.Thread.__init__()方法即可
def run(self):
while not self.queue.empty():
u = queue.get()
re = requests.get(u)
if re.status_code in self.status_code:
print('['+str(re.status_code)+']=>'+u)
#多线程调用
def Start_Scan(queue,args):
threads = []
thread_count = args['thread_count']
for i in range(thread_count):
threads.append(ScanDir(queue,args['status_code'],args['mode']))
for t in threads:
t.start()
for t in threads:
t.join()


if __name__ == "__main__":
start_time = time.time()
args = GetValue()
queue = GetDir(args['url'],args['dicf'])
# S = ScanDir(queue,args['status_code'],args['mode'])
# S.run()
Start_Scan(queue,args)
end_time = time.time()
print('Completed! time:{0}'.format(end_time-start_time))

效果图

​ 单线程

1558149711651

​ 多线程

1558149695984