专注于 JetBrains IDEA 全家桶,永久激活,教程
持续更新 PyCharm,IDEA,WebStorm,PhpStorm,DataGrip,RubyMine,CLion,AppCode 永久激活教程

golang net/http的简单分析

package main

import (
        "fmt"
        "net/http"
)

func main() {
        http.Handle("/", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
                fmt.Fprintf(w, "Hello, world...")
        }))
        http.ListenAndServe(":8080", nil)
}

看一下ListenAndServe到底是如何工作的. ListenAndServe侦听TCP网络地址addr,然后调用Serve with handler处理传入连接上的请求。接受的连接都配置为开启 TCP keep-alives.该处理程序通常为nil,在这种情况下,将使用DefaultServeMux

// ListenAndServe always returns a non-nil error.
func ListenAndServe(addr string, handler Handler) error {
    server := &Server{Addr: addr, Handler: handler}
    return server.ListenAndServe()
}

关键还是ListenAndServe,而它的实质还是创建一个tcp Listener,然后srv.Serve(tcpKeepAliveListener{ln.(*net.TCPListener)})

func (srv *Server) ListenAndServe() error {
    if srv.shuttingDown() {
        return ErrServerClosed
    }
    addr := srv.Addr
    if addr == "" {
        addr = ":http"
    }
    ln, err := net.Listen("tcp", addr)
    if err != nil {
        return err
    }
    return srv.Serve(tcpKeepAliveListener{ln.(*net.TCPListener)})
}

首先看一下tcpKeepAliveListener,

type tcpKeepAliveListener struct {
    *net.TCPListener
}

func (ln tcpKeepAliveListener) Accept() (net.Conn, error) {
    tc, err := ln.AcceptTCP()
    if err != nil {
        return nil, err
    }
    tc.SetKeepAlive(true)
    tc.SetKeepAlivePeriod(3 * time.Minute)
    return tc, nil
}

其结构体实现了Accept方法。该方法设置了开启Tcp keep-alive属性。

再看一下Server的Serve()方法。其接受参数监听器上的连接,然后调用新的服务协程来进行处理请求。服务协程读取请求,然后调用srv.Handler返回给客户端。

func (srv *Server) Serve(l net.Listener) error {
    // 该方法是对其测试的钩子
    if fn := testHookServerServe; fn != nil {
        fn(srv, l) // call hook with unwrapped listener
    }

    l = &onceCloseListener{Listener: l}
    defer l.Close()

    if err := srv.setupHTTP2_Serve(); err != nil {
        return err
    }

    if !srv.trackListener(&l, true) {
        return ErrServerClosed
    }
    defer srv.trackListener(&l, false)

    var tempDelay time.Duration     // how long to sleep on accept failure
    baseCtx := context.Background() // base is always background, per Issue 16220
    // ServerContextKey = &contextKey{"http-server"},该变量是一个全局的上下文的key,
    // 可以在带有context.WithValue的HTTP处理程序中使用它来访问启动处理程序的服务器
    ctx := context.WithValue(baseCtx, ServerContextKey, srv)
    for {
        rw, e := l.Accept()
        if e != nil {
            select {
            case <-srv.getDoneChan():
                return ErrServerClosed
            default:
            }
            if ne, ok := e.(net.Error); ok && ne.Temporary() {
                if tempDelay == 0 {
                    tempDelay = 5 * time.Millisecond
                } else {
                    tempDelay *= 2
                }
                if max := 1 * time.Second; tempDelay > max {
                    tempDelay = max
                }
                srv.logf("http: Accept error: %v; retrying in %v", e, tempDelay)
                time.Sleep(tempDelay)
                continue
            }
            return e
        }
        tempDelay = 0
        c := srv.newConn(rw)
        c.setState(c.rwc, StateNew) // before Serve can return
        go c.serve(ctx)
    }
}

onceCloseListener包裹传进来的Listener.使用sync.Once原语对其进行多放Close调用。该原语支持仅一次方法调用。事实上,sync.Once是一个只执行一个操作的对象。多次调用其上的Do()方法,仅首次调用起效。 看一下Do方法的原型func (o *Once) Do(f func()) 因为对Do的调用只有在对f的调用返回时才会返回,如果f导致Do被调用,那么它就会死锁。 如果f函数异常,Do将任务其已经结束返回,后续对Do的调用将不再调用f。

// onceCloseListener wraps a net.Listener, protecting it from
// multiple Close calls.
type onceCloseListener struct {
    net.Listener
    once     sync.Once
    closeErr error
}

func (oc *onceCloseListener) Close() error {
    oc.once.Do(oc.close)
    return oc.closeErr
}

func (oc *onceCloseListener) close() { oc.closeErr = oc.Listener.Close() }

通过查看Serve()方法,可以看出,其实质上还是一个加强版的服务器版本,剥离之后,和如下的服务器是不是大同小异啊

package main

import (
        "fmt"
        "io"
        "log"
        "net"
)

func main() {
        ln, err := net.Listen("tcp", ":8080")
        if err != nil {
                fmt.Println(err)
                return
        }
        conn, err := ln.Accept()
        if err != nil {
                fmt.Println(err)
                return
        }
        var buf = make([]byte, 10)
        var result string
        for {
                n, err := conn.Read(buf)
                if err == io.EOF {
                        fmt.Println(err.Error())
                        break
                }
                if err != nil {
                        fmt.Println(err)
                        return
                }
                log.Printf("client content is %s\n", string(buf[:n]))
                result += string(buf[:n])
        }
        log.Println(result)
}

具体是如何加强的,可以看看c := srv.newConn(rw)。其在原始的net.Conn上穿了一件衣服.

// Create new connection from rwc.
func (srv *Server) newConn(rwc net.Conn) *conn {
    c := &conn{
        server: srv,
        rwc:    rwc,
    }
    if debugServerConnections {
        c.rwc = newLoggingConn("server", c.rwc)
    }
    return c
}

穿完了衣服,干了一件事儿c.setState(c.rwc, StateNew) // before Serve can return

func (c *conn) setState(nc net.Conn, state ConnState) {
    srv := c.server
    switch state {
    case StateNew:
        srv.trackConn(c, true)
    case StateHijacked, StateClosed:
        srv.trackConn(c, false)
    }
    if state > 0xff || state < 0 {
        panic("internal error")
    }
    packedState := uint64(time.Now().Unix()<<8) | uint64(state)
    atomic.StoreUint64(&c.curState.atomic, packedState)
    // 当客户端连接发生状态变化时,进行一个回调函数的处理
    if hook := srv.ConnState; hook != nil {
        hook(nc, state)
    }
}

咱们传入的是StateNew,看看他干了啥

func (s *Server) trackConn(c *conn, add bool) {
    s.mu.Lock()
    defer s.mu.Unlock()
    if s.activeConn == nil {
        s.activeConn = make(map[*conn]struct{})
    }
    if add {
        s.activeConn[c] = struct{}{}
    } else {
        delete(s.activeConn, c)
    }
}

就是把当前新的连接加入到服务器的活跃的连接中。 然后起了一个goroutine,接管了请求处理。

// Serve a new connection.
func (c *conn) serve(ctx context.Context) {
    c.remoteAddr = c.rwc.RemoteAddr().String()
    ctx = context.WithValue(ctx, LocalAddrContextKey, c.rwc.LocalAddr())
    defer func() {
        if err := recover(); err != nil && err != ErrAbortHandler {
            const size = 64 << 10
            buf := make([]byte, size)
            buf = buf[:runtime.Stack(buf, false)]
            c.server.logf("http: panic serving %v: %v\n%s", c.remoteAddr, err, buf)
        }
        if !c.hijacked() {
            c.close()
            c.setState(c.rwc, StateClosed)
        }
    }()

    if tlsConn, ok := c.rwc.(*tls.Conn); ok {
        if d := c.server.ReadTimeout; d != 0 {
            c.rwc.SetReadDeadline(time.Now().Add(d))
        }
        if d := c.server.WriteTimeout; d != 0 {
            c.rwc.SetWriteDeadline(time.Now().Add(d))
        }
        if err := tlsConn.Handshake(); err != nil {
            // If the handshake failed due to the client not speaking
            // TLS, assume they're speaking plaintext HTTP and write a
            // 400 response on the TLS conn's underlying net.Conn.
            if re, ok := err.(tls.RecordHeaderError); ok && re.Conn != nil && tlsRecordHeaderLooksLikeHTTP(re.RecordHeader) {
                io.WriteString(re.Conn, "HTTP/1.0 400 Bad Request\r\n\r\nClient sent an HTTP request to an HTTPS server.\n")
                re.Conn.Close()
                return
            }
            c.server.logf("http: TLS handshake error from %s: %v", c.rwc.RemoteAddr(), err)
            return
        }
        c.tlsState = new(tls.ConnectionState)
        *c.tlsState = tlsConn.ConnectionState()
        if proto := c.tlsState.NegotiatedProtocol; validNPN(proto) {
            if fn := c.server.TLSNextProto[proto]; fn != nil {
                h := initNPNRequest{tlsConn, serverHandler{c.server}}
                fn(c.server, tlsConn, h)
            }
            return
        }
    }

    // HTTP/1.x from here on.

    ctx, cancelCtx := context.WithCancel(ctx)
    c.cancelCtx = cancelCtx
    defer cancelCtx()

    c.r = &connReader{conn: c}
    c.bufr = newBufioReader(c.r)
    c.bufw = newBufioWriterSize(checkConnErrorWriter{c}, 4<<10)

    for {
        w, err := c.readRequest(ctx)
        if c.r.remain != c.server.initialReadLimitSize() {
            // If we read any bytes off the wire, we're active.
            c.setState(c.rwc, StateActive)
        }
        if err != nil {
            const errorHeaders = "\r\nContent-Type: text/plain; charset=utf-8\r\nConnection: close\r\n\r\n"

            if err == errTooLarge {
                // Their HTTP client may or may not be
                // able to read this if we're
                // responding to them and hanging up
                // while they're still writing their
                // request. Undefined behavior.
                const publicErr = "431 Request Header Fields Too Large"
                fmt.Fprintf(c.rwc, "HTTP/1.1 "+publicErr+errorHeaders+publicErr)
                c.closeWriteAndWait()
                return
            }
            if isCommonNetReadError(err) {
                return // don't reply
            }

            publicErr := "400 Bad Request"
            if v, ok := err.(badRequestError); ok {
                publicErr = publicErr + ": " + string(v)
            }

            fmt.Fprintf(c.rwc, "HTTP/1.1 "+publicErr+errorHeaders+publicErr)
            return
        }

        // Expect 100 Continue support
        req := w.req
        if req.expectsContinue() {
            if req.ProtoAtLeast(1, 1) && req.ContentLength != 0 {
                // Wrap the Body reader with one that replies on the connection
                req.Body = &expectContinueReader{readCloser: req.Body, resp: w}
            }
        } else if req.Header.get("Expect") != "" {
            w.sendExpectationFailed()
            return
        }

        c.curReq.Store(w)

        if requestBodyRemains(req.Body) {
            registerOnHitEOF(req.Body, w.conn.r.startBackgroundRead)
        } else {
            w.conn.r.startBackgroundRead()
        }

        // HTTP cannot have multiple simultaneous active requests.[*]
        // Until the server replies to this request, it can't read another,
        // so we might as well run the handler in this goroutine.
        // [*] Not strictly true: HTTP pipelining. We could let them all process
        // in parallel even if their responses need to be serialized.
        // But we're not going to implement HTTP pipelining because it
        // was never deployed in the wild and the answer is HTTP/2.
        // HTTP不能同时有多个活动请求。
        // 直到服务器回复该请求,它才能读取另一个请求
        // 因此我们最好在此goroutine中运行处理程序。
        serverHandler{c.server}.ServeHTTP(w, w.req)
        w.cancelCtx()
        if c.hijacked() {
            return
        }
        w.finishRequest()
        if !w.shouldReuseConnection() {
            if w.requestBodyLimitHit || w.closedRequestBodyEarly() {
                c.closeWriteAndWait()
            }
            return
        }
        c.setState(c.rwc, StateIdle)
        c.curReq.Store((*response)(nil))

        if !w.conn.server.doKeepAlives() {
            // We're in shutdown mode. We might've replied
            // to the user without "Connection: close" and
            // they might think they can send another
            // request, but such is life with HTTP/1.1.
            return
        }

        if d := c.server.idleTimeout(); d != 0 {
            c.rwc.SetReadDeadline(time.Now().Add(d))
            if _, err := c.bufr.Peek(4); err != nil {
                return
            }
        }
        c.rwc.SetReadDeadline(time.Time{})
    }
}

先看看context.WithValue,WithValue返回parent的副本,其中与key关联的值为val。仅将上下文值用于传递过程和API的请求范围的数据,而不用于将可选参数传递给函数

func WithValue(parent Context, key, val interface{}) Context {
    if key == nil {
        panic("nil key")
    }
    if !reflect.TypeOf(key).Comparable() {
        panic("key is not comparable")
    }
    return &valueCtx{parent, key, val}
}

在每个goroutine中,如下我们处理请求:

// serverHandler delegates to either the server's Handler or
// DefaultServeMux and also handles "OPTIONS *" requests.
type serverHandler struct {
    srv *Server
}

func (sh serverHandler) ServeHTTP(rw ResponseWriter, req *Request) {
    handler := sh.srv.Handler
    if handler == nil {
        handler = DefaultServeMux
    }
    if req.RequestURI == "*" && req.Method == "OPTIONS" {
        handler = globalOptionsHandler{}
    }
    handler.ServeHTTP(rw, req)
}

首先其会从Server中检索Handler,如果handler为空,默认的会使用DefaultServeMux,如果请求的URI是*并且请求方式是OPTIONS,会使用globalOptionsHandler. 然后进行handler处理,以上是对整个流程的一个详细介绍,而对于像如下的服务端,也就大差不差了

package main

import (
        "fmt"
        "net/http"
        _ "net/http/pprof"
)

type UserRespone struct{}

func (*UserRespone) ServeHTTP(w http.ResponseWriter, r *http.Request) {
        fmt.Fprintf(w, "Welcome...")
}

func main() {
        mux := http.NewServeMux()
        mux.Handle("/", &UserRespone{})
        http.ListenAndServe(":8080", mux)
}

mux就是一个实现了Handler接口的结构体,自然可以在ListenAndServe中使用。

type ServeMux struct {
    mu    sync.RWMutex
    m     map[string]muxEntry
    es    []muxEntry // slice of entries sorted from longest to shortest.
    hosts bool       // whether any patterns contain hostnames
}
// ServeHTTP dispatches the request to the handler whose
// pattern most closely matches the request URL.
func (mux *ServeMux) ServeHTTP(w ResponseWriter, r *Request) {
    if r.RequestURI == "*" {
        if r.ProtoAtLeast(1, 1) {
            w.Header().Set("Connection", "close")
        }
        w.WriteHeader(StatusBadRequest)
        return
    }
    h, _ := mux.Handler(r)
    h.ServeHTTP(w, r)
}

再来看看

http.Handle("/", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
                fmt.Fprintf(w, "Hello, world...")
        }))

HandlerFunc是一个适配器,将任何符合func(ResponseWriter, *Request)原型的函数,都可以组装成符合Handler接口的函数,用来作为请求的路径上的处理函数。

// The HandlerFunc type is an adapter to allow the use of
// ordinary functions as HTTP handlers. If f is a function
// with the appropriate signature, HandlerFunc(f) is a
// Handler that calls f.
type HandlerFunc func(ResponseWriter, *Request)

// ServeHTTP calls f(w, r).
func (f HandlerFunc) ServeHTTP(w ResponseWriter, r *Request) {
    f(w, r)
}

看看Handler接口的样子,就是一个有ServeHTTP方法声明的接口类型, 任何实现了ServeHTTP方法的类型都可以用在需要接受Handlerd的地方。

文章永久链接:https://tech.souyunku.com/41030

未经允许不得转载:搜云库技术团队 » golang net/http的简单分析

JetBrains 全家桶,激活、破解、教程

提供 JetBrains 全家桶激活码、注册码、破解补丁下载及详细激活教程,支持 IntelliJ IDEA、PyCharm、WebStorm 等工具的永久激活。无论是破解教程,还是最新激活码,均可免费获得,帮助开发者解决常见激活问题,确保轻松破解并快速使用 JetBrains 软件。获取免费的破解补丁和激活码,快速解决激活难题,全面覆盖 2024/2025 版本!

联系我们联系我们