专注于 JetBrains IDEA 全家桶,永久激活,教程
持续更新 PyCharm,IDEA,WebStorm,PhpStorm,DataGrip,RubyMine,CLion,AppCode 永久激活教程

Python实现代理服务器测试

测试安装pip 模块成功 参考自:blog.csdn.net/a1053904672…


# encoding:utf-8
import socket
import threading

class Header:
    """
    读取和解析头信息
    """

    def __init__(self, conn):
        self._method = None
        header = b''
        try:
            while 1:
                data = conn.recv(4096)
                header = b"%s%s" % (header, data)
                if header.endswith(b'\r\n\r\n') or (not data):
                    break
        except:
            pass
        self._header = header
        self.header_list = header.split(b'\r\n')
        self._host = None
        self._port = None

    def get_method(self):
        """
        获取请求方式
        :return:
        """
        if self._method is None:
            self._method = self._header[:self._header.index(b' ')]
        return self._method

    def get_host_info(self):
        """
        获取目标主机的ip和端口
        :return:
        """
        if self._host is None:
            method = self.get_method()
            line = self.header_list[0].decode('utf8')
            if method == b"CONNECT":
                host = line.split(' ')[1]
                if ':' in host:
                    host, port = host.split(':')
                else:
                    port = 443
            else:
                for i in self.header_list:
                    if i.startswith(b"Host:"):
                        host = i.split(b" ")
                        if len(host) < 2:
                            continue
                        host = host[1].decode('utf8')
                        break
                else:
                    host = line.split('/')[2]
                if ':' in host:
                    host, port = host.split(':')
                else:
                    port = 80
            self._host = host
            self._port = int(port)
        return self._host, self._port

    @property
    def data(self):
        """
        返回头部数据
        :return:
        """
        return self._header

    def is_ssl(self):
        """
        判断是否为 https协议
        :return:
        """
        if self.get_method() == b'CONNECT':
            return True
        return False

    def __repr__(self):
        return str(self._header.decode("utf8"))

def communicate(sock1, sock2):
    """
    socket之间的数据交换
    :param sock1:
    :param sock2:
    :return:
    """
    try:
        sip1=sock1.getpeername()#sock1.getsockname(), 返回socket自己的地址
        sip2=sock2.getpeername()
        while 1:
            data = sock1.recv(4096)
            if not data:
                #return
                sock2.close()
                break
            sock2.sendall(data)

    except:
        pass
    finally:
        #print(str(sip1)+'-->'+str(sip2)+'断开连接\n')
        print(str(sip1)+'断开连接\n')
        sock1.close()
        #print('the client disconnct')

def handle(client):
    """
    处理连接进来的客户端
    :param client:
    :return:
    """
    timeout = 1000#90
    client.settimeout(timeout)
    header = Header(client)
    if not header.data:
        client.close()
        return
    print(*header.get_host_info(), header.get_method())
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    try:
        server.connect(header.get_host_info())
        server.settimeout(timeout)
        if header.is_ssl():
            data = b"HTTP/1.0 200 Connection Established\r\n\r\n"
            client.sendall(data)

            threading.Thread(target=communicate,args=(client, server)).start()
            threading.Thread(target=communicate,args=(server, client)).start()
        else:#http 模式
            server.sendall(header.data)
            threading.Thread(target=communicate,args=(client, server)).start()
            threading.Thread(target=communicate,args=(server, client)).start()
            #communicate(server, client)#http 模式,使用后断开连接,减少线程开销
    except:
        server.close()
        client.close()

def serve(ip, port):
    """
    代理服务
    :param ip:
    :param port:
    :return:
    """
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    s.bind((ip, port))
    s.listen(10)
    print('proxy start...')
    while True:
        conn, addr = s.accept()
        threading.Thread(target=handle,args=(conn,)).start()

if __name__ == '__main__':
    IP = "0.0.0.0"
    PORT = 6090
    serve(IP, PORT)

文章永久链接:https://tech.souyunku.com/42149

未经允许不得转载:搜云库技术团队 » Python实现代理服务器测试

JetBrains 全家桶,激活、破解、教程

提供 JetBrains 全家桶激活码、注册码、破解补丁下载及详细激活教程,支持 IntelliJ IDEA、PyCharm、WebStorm 等工具的永久激活。无论是破解教程,还是最新激活码,均可免费获得,帮助开发者解决常见激活问题,确保轻松破解并快速使用 JetBrains 软件。获取免费的破解补丁和激活码,快速解决激活难题,全面覆盖 2024/2025 版本!

联系我们联系我们