go-ethereum之rpc
简介
主要用于DeApp与node之间的交互
结构
Client:服务端接收到一个连接后创建Client来处理请求,表示一个到服务端的连接
type Client struct {idgen func() ID // for subscriptionsisHTTP bool // connection type: http, ws or ipcservices *serviceRegistryidCounter atomic.Uint32// This function, if non-nil, is called when the connection is lost.reconnectFunc reconnectFunc// config fieldsbatchItemLimit intbatchResponseMaxSize int// writeConn is used for writing to the connection on the caller's goroutine. It should// only be accessed outside of dispatch, with the write lock held. The write lock is// taken by sending on reqInit and released by sending on reqSent.writeConn jsonWriter// for dispatchclose chan struct{}closing chan struct{} // closed when client is quittingdidClose chan struct{} // closed when client quitsreconnected chan ServerCodec // where write/reconnect sends the new connectionreadOp chan readOp // read messagesreadErr chan error // errors from readreqInit chan *requestOp // register response IDs, takes write lockreqSent chan error // signals write completion, releases write lockreqTimeout chan *requestOp // removes response IDs when call timeout expires
}
客户端创建Client使用newClient,服务端接收连接创建表示连接的客户端使用initClient。即客户端主要使用下面几个方法
- DialOptions
- DialHTTPWithClient(http)
- DialInProc(inproc进程内)
- DialIPC(进程间)
- DialWebsocketWithDialer或者DialWebsocket(websocket)
func newClient(initctx context.Context, cfg *clientConfig, connect reconnectFunc) (*Client, error) {conn, err := connect(initctx)if err != nil {return nil, err}c := initClient(conn, new(serviceRegistry), cfg)c.reconnectFunc = connectreturn c, nil
}func initClient(conn ServerCodec, services *serviceRegistry, cfg *clientConfig) *Client {_, isHTTP := conn.(*httpConn)c := &Client{isHTTP: isHTTP,services: services,idgen: cfg.idgen,batchItemLimit: cfg.batchItemLimit,batchResponseMaxSize: cfg.batchResponseLimit,writeConn: conn,close: make(chan struct{}),closing: make(chan struct{}),didClose: make(chan struct{}),reconnected: make(chan ServerCodec),readOp: make(chan readOp),readErr: make(chan error),reqInit: make(chan *requestOp),reqSent: make(chan error, 1),reqTimeout: make(chan *requestOp),}// Set defaults.if c.idgen == nil {c.idgen = randomIDGenerator()}// Launch the main loop.if !isHTTP {go c.dispatch(conn)}return c
}
Server:rpc服务器
type Server struct {services serviceRegistryidgen func() IDmutex sync.Mutexcodecs map[ServerCodec]struct{}run atomic.BoolbatchItemLimit intbatchResponseLimit inthttpBodyLimit intwsReadLimit int64
}
serviceRegistry:用于管理服务器对外提供的服务
type serviceRegistry struct {mu sync.Mutexservices map[string]service
}
service:对外提供的一个服务
type service struct {name string // name for servicecallbacks map[string]*callback // registered handlerssubscriptions map[string]*callback // available subscriptions/notifications
}
callback:服务中的一个方法
type callback struct {fn reflect.Value // the functionrcvr reflect.Value // receiver object of method, set if fn is methodargTypes []reflect.Type // input argument typeshasCtx bool // method's first argument is a context (not included in argTypes)errPos int // err return idx, of -1 when method cannot return errorisSubscribe bool // true if this is a subscription callback
}
ServerCodec :网络编解码接口
type ServerCodec interface {peerInfo() PeerInforeadBatch() (msgs []*jsonrpcMessage, isBatch bool, err error)close()jsonWriter
}
