听人说起这个idea,感觉挺有意思(虽然没有什么大的用处)
成品Demo
https://github.com/nICEnnnnnnnLee/proxy
前言
- 在学异步IO和协程,感觉廖雪峰这个例子举得很好,感觉豁然开朗。
为什么不能在协程里面放需要长时间运行的东西,因为它会阻塞整个进程。 - 前面实现了简单的SNI代理,现在进一步,将HTTP/HTTPS也代理给加上。
- 同步的实现里面改为使用select进行轮询,没有再开两个thread。感觉Python本身很多模块很有用处,值得我们花时间去了解熟悉。
- 以前在学习HTTP协议的时候,尝试着用Java也写了一个代理,感觉花了一番功夫。
现在同样是基于TCP socket,同样不借助第三方库,百来行代码即可实现,感觉十分轻松。
不知道是理解上去了,还是…
代码
当前的实现认为客户端第一次发送的消息包含且只包含整个头部。
sni_helper.py
用于从Client Hello消息里面提取host,见前文
proxy_sync.py
同步实现
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import socket, threading, re, select
import sni_helper
TIME_OUT_ERR = socket.timeout
def recv(socket, bufferSize=1024):
try:
return socket.recv(bufferSize)
except TIME_OUT_ERR: # socket.timeout changes into a number when timeout is set
if not stop:
return recv(socket, bufferSize)
def socket_handler(clientSock, addr):
clientSock.settimeout(5)
data = recv(clientSock)
if not data:
return
is_https_proxy = False
if data.startswith(b'CONNECT'):
head = data.decode('latin1')
search = re.search(r'^CONNECT ([^:]+)(?::([0-9]+))? HTTP[0-9/\.]+\r\n', head)
if search:
sni = search.group(1)
port = int(search.group(2)) if search.group(2) else 443
is_https_proxy = True
elif data.startswith(b"GET ") or data.startswith(b"POST ") or data.startswith(b"PUT ") or data.startswith(b"DELETE ") or data.startswith(b"OPTIONS ") or data.startswith(b"UPDATE "):
head = data.decode('latin1')
search = re.search(r'\r\nHost: ([^:]+)(?::([0-9]+))?\r\n', head)
if search:
sni = search.group(1)
port = int(search.group(2)) if search.group(2) else 80
else:
sni = sni_helper.GetSniFromSslPlainText(data)
port = 443
if 'sni' not in locals():
print('sni not found')
return
#print('Accept new connection from %s:%s...' % addr)
print('Establishing new connection to %s:%d' %(sni, port))
try:
serverSock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
#serverSock.connect((sni, port))
serverSock.connect((getHost(sni), port))
if is_https_proxy:
clientSock.send(b'HTTP/1.1 200 Connection Established\r\n\r\n')
else:
serverSock.send(data)
serverSock.settimeout(5)
'''
t1 = threading.Thread(target=fromTo, args=(clientSock, serverSock), name='thread-%s-toServer'%sni)
t2 = threading.Thread(target=fromTo, args=(serverSock, clientSock), name='thread-%s-toClient'%sni)
t1.start(); t2.start(); #t1.join(); t2.join()
'''
fdset = [clientSock, serverSock]
while not stop:
r, w, e = select.select(fdset, [], [], 5)
if clientSock in r:
if serverSock.send(clientSock.recv(1024)) <= 0: break
if serverSock in r:
if clientSock.send(serverSock.recv(1024)) <= 0: break
except Exception as e:
pass
finally:
print(f'{sni} connection closed')
clientSock.close()
serverSock.close()
def getHost(sni):
host = hosts.get(sni, sni)
return host
def fromTo(fromSock, toSock):
try:
data = recv(fromSock)
while data and not stop:
toSock.send(data)
data = recv(fromSock)
except:
pass
finally:
fromSock.close()
toSock.close()
def startServer(port: int = 443, maxLink = 5):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(('0.0.0.0', port))
s.listen(maxLink)
s.settimeout(5.0)
#print('Waiting for connection...')
print(f'Serving on {s.getsockname()}')
while not stop:
try:
sock, addr = s.accept()
t = threading.Thread(target=socket_handler, args=(sock, addr), name='thread-dealSocket %s:%s'%addr)
t.start()
except socket.timeout as e:
pass
stop = False
hosts = {
"www.baidu.com":"14.215.177.38",
}
if __name__ == '__main__':
threadServer = threading.Thread(target=startServer, args=(443, 5), name='thread-startServer')
threadServer.start()
try:
input('Enter any key to stop.\r\n')
finally:
stop = True
proxy_async.py
异步实现
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import asyncio, sys, re
import sni_helper
async def socket_handler(client_reader, client_writer):
checkTasks()
data = await client_reader.read(1024)
if not data:
return
#addr = writer.get_extra_info('peername')
is_https_proxy = False
if data.startswith(b'CONNECT'):
head = data.decode('latin1')
search = re.search(r'^CONNECT ([^:]+)(?::([0-9]+))? HTTP[0-9/\.]+\r\n', head)
if search:
host = search.group(1)
port = int(search.group(2)) if search.group(2) else 443
is_https_proxy = True
elif data.startswith(b"GET ") or data.startswith(b"POST ") or data.startswith(b"PUT ") or data.startswith(b"DELETE ") or data.startswith(b"OPTIONS ") or data.startswith(b"UPDATE "):
head = data.decode('latin1')
search = re.search(r'\r\nHost: ([^:]+)(?::([0-9]+))?\r\n', head)
if search:
host = search.group(1)
port = int(search.group(2)) if search.group(2) else 80
else:
host = sni_helper.GetSniFromSslPlainText(data)
port = 443
if 'host' not in locals():
print('host not found')
return
server_reader, server_writer = await asyncio.open_connection(getHost(host), port)
if is_https_proxy:
client_writer.write(b'HTTP/1.1 200 Connection Established\r\n\r\n')
else:
server_writer.write(data)
# use this if you wanna keep the connetion alive util the client/server close it
#task = asyncio.create_task(pip(client_reader, server_writer))
# close the connection if it lives for 2 min
task = asyncio.create_task(asyncio.wait_for(pip(client_reader, server_writer), timeout=120.0))
tasks.append(task)
task = asyncio.create_task(pip(server_reader, client_writer))
tasks.append(task)
async def pip(from_reader, to_writer):
try:
await to_writer.drain()
data = await from_reader.read(1024)
while data:
to_writer.write(data)
await to_writer.drain()
data = await from_reader.read(1024)
except:
pass
finally:
if not to_writer.is_closing():
to_writer.close()
await to_writer.wait_closed()
async def serve_forever(server):
try:
async with server:
await server.serve_forever()
except:
pass
async def main():
server = await asyncio.start_server(
socket_handler, '0.0.0.0', 1081)
addr = server.sockets[0].getsockname()
print(f'Serving on {addr}')
task = asyncio.create_task(serve_forever(server))
tasks.append(task)
loop = asyncio.get_event_loop()
inp = await loop.run_in_executor(None, input, 'Enter anything to stop.\r\n')
for task in tasks:
task.cancel()
def getHost(sni):
host = hosts.get(sni, sni)
return host
def checkTasks():
if len(tasks) >= 30:
for task in tasks:
if task.done():
tasks.remove(task)
hosts = {
"www.baidu.com":"14.215.177.38",
}
tasks = []
if __name__ == '__main__':
asyncio.run(main())