NiceLeeのBlog 用爱发电 bilibili~

Python 一种自动判断类型的SNI/HTTP/HTTPS代理

2021-03-27
nIceLee

阅读:


听人说起这个idea,感觉挺有意思(虽然没有什么大的用处)

成品Demo

https://github.com/nICEnnnnnnnLee/proxy

前言

  • 在学异步IO和协程,感觉廖雪峰这个例子举得很好,感觉豁然开朗。
    为什么不能在协程里面放需要长时间运行的东西,因为它会阻塞整个进程。
  • 前面实现了简单的SNI代理,现在进一步,将HTTP/HTTPS也代理给加上。
  • 同步的实现里面改为使用select进行轮询,没有再开两个thread。感觉Python本身很多模块很有用处,值得我们花时间去了解熟悉。
  • 以前在学习HTTP协议的时候,尝试着用Java也写了一个代理,感觉花了一番功夫。
    现在同样是基于TCP socket,同样不借助第三方库,百来行代码即可实现,感觉十分轻松。
    不知道是理解上去了,还是…

代码

当前的实现认为客户端第一次发送的消息包含且只包含整个头部。

sni_helper.py

用于从Client Hello消息里面提取host,见前文

proxy_sync.py

同步实现

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import socket, threading, re, select
import sni_helper

TIME_OUT_ERR = socket.timeout
def recv(socket, bufferSize=1024):
    try:
        return socket.recv(bufferSize)
    except TIME_OUT_ERR: # socket.timeout changes into a number when timeout is set
        if not stop:
            return recv(socket, bufferSize)
            
def socket_handler(clientSock, addr):
    clientSock.settimeout(5)
    data = recv(clientSock)
    if not data:
        return
    is_https_proxy = False
        
    if data.startswith(b'CONNECT'):
        head = data.decode('latin1')
        search = re.search(r'^CONNECT ([^:]+)(?::([0-9]+))? HTTP[0-9/\.]+\r\n', head)
        if search:
            sni = search.group(1)
            port = int(search.group(2)) if search.group(2) else 443
            is_https_proxy = True
    elif data.startswith(b"GET ") or data.startswith(b"POST ") or data.startswith(b"PUT ") or data.startswith(b"DELETE ") or data.startswith(b"OPTIONS ") or data.startswith(b"UPDATE "):
        head = data.decode('latin1')
        search = re.search(r'\r\nHost: ([^:]+)(?::([0-9]+))?\r\n', head)
        if search:
            sni = search.group(1)
            port = int(search.group(2)) if search.group(2) else 80
    else:
        sni = sni_helper.GetSniFromSslPlainText(data)
        port = 443
        
    if 'sni' not in locals():
        print('sni not found')
        return
    #print('Accept new connection from %s:%s...' % addr)
    print('Establishing new connection to %s:%d' %(sni, port))
    
    try:
        serverSock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        #serverSock.connect((sni, port))
        serverSock.connect((getHost(sni), port))
        if is_https_proxy:
            clientSock.send(b'HTTP/1.1 200 Connection Established\r\n\r\n')
        else:
            serverSock.send(data)
        serverSock.settimeout(5)
        '''
        t1 = threading.Thread(target=fromTo, args=(clientSock, serverSock), name='thread-%s-toServer'%sni)
        t2 = threading.Thread(target=fromTo, args=(serverSock, clientSock), name='thread-%s-toClient'%sni)
        t1.start(); t2.start(); #t1.join(); t2.join()
        '''
        fdset = [clientSock, serverSock]
        while not stop:
            r, w, e = select.select(fdset, [], [], 5)
            if clientSock in r:
                if serverSock.send(clientSock.recv(1024)) <= 0: break
            if serverSock in r:
                if clientSock.send(serverSock.recv(1024)) <= 0: break
    except Exception as e:
        pass
    finally:
        print(f'{sni} connection closed')
        clientSock.close()
        serverSock.close()

def getHost(sni):
    host = hosts.get(sni, sni)
    return host
def fromTo(fromSock, toSock):
    try:
        data = recv(fromSock)
        while data and not stop:
            toSock.send(data)
            data = recv(fromSock)
    except:
        pass
    finally:
        fromSock.close()
        toSock.close()

def startServer(port: int = 443, maxLink = 5):
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.bind(('0.0.0.0', port))
    s.listen(maxLink)
    s.settimeout(5.0)
    #print('Waiting for connection...')
    print(f'Serving on {s.getsockname()}')
    while not stop:
        try:
            sock, addr = s.accept()
            t = threading.Thread(target=socket_handler, args=(sock, addr), name='thread-dealSocket %s:%s'%addr)
            t.start()
        except socket.timeout as e:
            pass


stop = False
hosts = {
    "www.baidu.com":"14.215.177.38",
}
if __name__ == '__main__':
    threadServer = threading.Thread(target=startServer, args=(443, 5), name='thread-startServer')
    threadServer.start()

    try:
        input('Enter any key to stop.\r\n')
    finally:
        stop = True

proxy_async.py

异步实现

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import asyncio, sys, re
import sni_helper

async def socket_handler(client_reader, client_writer):
    checkTasks()
    data = await client_reader.read(1024)
    if not data:
        return
    #addr = writer.get_extra_info('peername')
    is_https_proxy = False
        
    if data.startswith(b'CONNECT'):
        head = data.decode('latin1')
        search = re.search(r'^CONNECT ([^:]+)(?::([0-9]+))? HTTP[0-9/\.]+\r\n', head)
        if search:
            host = search.group(1)
            port = int(search.group(2)) if search.group(2) else 443
            is_https_proxy = True
    elif data.startswith(b"GET ") or data.startswith(b"POST ") or data.startswith(b"PUT ") or data.startswith(b"DELETE ") or data.startswith(b"OPTIONS ") or data.startswith(b"UPDATE "):
        head = data.decode('latin1')
        search = re.search(r'\r\nHost: ([^:]+)(?::([0-9]+))?\r\n', head)
        if search:
            host = search.group(1)
            port = int(search.group(2)) if search.group(2) else 80
    else:
        host = sni_helper.GetSniFromSslPlainText(data)
        port = 443
        
    if 'host' not in locals():
        print('host not found')
        return
    server_reader, server_writer = await asyncio.open_connection(getHost(host), port)
    if is_https_proxy:
        client_writer.write(b'HTTP/1.1 200 Connection Established\r\n\r\n')
    else:
        server_writer.write(data)
    # use this if you wanna keep the connetion alive util the client/server close it
    #task = asyncio.create_task(pip(client_reader, server_writer))
    # close the connection if it lives for 2 min
    task = asyncio.create_task(asyncio.wait_for(pip(client_reader, server_writer), timeout=120.0))
    tasks.append(task)
    task = asyncio.create_task(pip(server_reader, client_writer))
    tasks.append(task)
    
async def pip(from_reader, to_writer):
    try:
        await to_writer.drain()
        data = await from_reader.read(1024)
        while data:
            to_writer.write(data)
            await to_writer.drain()
            data = await from_reader.read(1024)
    except:
        pass
    finally:
        if not to_writer.is_closing():
            to_writer.close()
            await to_writer.wait_closed()

async def serve_forever(server):
    try:
        async with server:
            await server.serve_forever()
    except:
        pass

async def main():
    server = await asyncio.start_server(
        socket_handler, '0.0.0.0', 1081)
    addr = server.sockets[0].getsockname()
    print(f'Serving on {addr}')
    task = asyncio.create_task(serve_forever(server))
    tasks.append(task)
    loop = asyncio.get_event_loop()
    inp = await loop.run_in_executor(None, input, 'Enter anything to stop.\r\n')
    for task in tasks:
        task.cancel()

def getHost(sni):
    host = hosts.get(sni, sni)
    return host
def checkTasks():
    if len(tasks) >= 30:
        for task in tasks:
            if  task.done():
                tasks.remove(task)
hosts = {
    "www.baidu.com":"14.215.177.38",
}
tasks = []
if __name__ == '__main__':
    asyncio.run(main())

相似文章

内容
隐藏