微服务之服务发现

服务发现需要提供的功能:

  • 管理服务端口和地址,在某个服务变更之后(IP或者端口),能够确保其他服务依然可以访问到
  • 当服务上线、下线时,能捕捉到服务状态,及时通知到客户端
  • 兼容http的服务和RPC的服务

技术选型

名称 优点 缺点 接口 一致性算法
zookeeper 1. 功能强大,不仅仅只是服务发现;2. 提供watcher机制实现实时获取服务提供者状态 1. 没有健康检查;2. 需要在服务中集成sdk,有复杂度;3. 不支持多数据中心 sdk Paxos(超过半数)
consul 1. 简单易用;2. 自带健康检查;3. 支持多数据中心;4. 提供web管理界面 1. 不能实时获取服务信息的变化通知 http/dns Raft(主从)
etcd 1. 简单易用;2. 可配置性强 1. 没有健康检查;2. 需配合第三方工具一起完成服务发现;3. 不支持多数据中心 http Raft

综上,如果是在golang的技术栈,选择专门做服务发现的 consul方案最好

使用

安装

1
docker run -d -p 8500:8500 -p 8300:8300 -p 8301:8301 -p 8302:8302 -p 8600:8600/udp --name consul consul consul agent -dev -client=0.0.0.0

8500:注册端口,http的端口

8600dns解析的端口

例如通过 docker 安装在本地

可以通过 http://127.0.0.1:8500打开管理页面,通过 dig @127.0.0.1 -p 8600 consul.service.consul SRV 检查 dns 服务是否正常。

官方 API 文档:Config HTTP Endpoint,提供HTTP交互的能力说明,例如常用的接口

1
2
3
4
5
6
7
8
9
10
11
12
13
// 服务注册
PUT /agent/service/register application/json
// 服务罗列
GET /agent/services application/json
// 通过 ID 获取本地服务运行状况
GET /agent/health/service/id/:service_id application/json
GET /agent/health/service/id/:service_id?format=text text/plain
// 注册服务
PUT /agent/service/register application/json
// 注销服务
PUT /agent/service/deregister/:service_id application/json
// 启用维护模式
PUT /agent/service/maintenance/:service_id application/json

HTTP协议

将服务注册到 consul 中,并且设置HTTP的健康检查

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func ConsulRegister(address string, port int) error {
cfg := api.DefaultConfig()
// consul 服务端地址和端口
cfg.Address = fmt.Sprintf("%s:%d", address, port)

client, err := api.NewClient(cfg)
if err != nil {
return err
}

check := api.AgentServiceCheck{
// consul服务端能访问到的 gin 服务IP和端口
HTTP: "http://192.168.41.68:8080/version",
Timeout: "5s",
Interval: "5s",
DeregisterCriticalServiceAfter: "10s",
Method: "POST",
}
registration := api.AgentServiceRegistration{
ID: "mitaka-srv", // id是服务唯一标识符,使用uuid更好
Name: "mitaka-srv",
Tags: []string{"mitaka", "golang", "example"},
Port: 8080,
Address: "127.0.0.1",
Check: &check,
}

err = client.Agent().ServiceRegister(&registration)
if err != nil {
return err
}

return nil
}

罗列注册到consul中的服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func ConsulList(address string, port int) error {
cfg := api.DefaultConfig()
cfg.Address = fmt.Sprintf("%s:%d", address, port)

client, err := api.NewClient(cfg)
if err != nil {
return err
}

// 全部获取
//services, err := client.Agent().Services()
// 过滤
services, err := client.Agent().ServicesWithFilter(`Service == "mitaka-srv"`)
if err != nil {
return err
}

for name, srv := range services {
fmt.Printf("svc name: %s,svc info: %+v\n", name, srv)
}

return nil
}

gRPC协议

官方文档:GRPC Health Checking Protocol

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
syntax = "proto3";

package grpc.health.v1;

message HealthCheckRequest {
string service = 1;
}

message HealthCheckResponse {
enum ServingStatus {
UNKNOWN = 0;
SERVING = 1;
NOT_SERVING = 2;
SERVICE_UNKNOWN = 3; // Used only by the Watch method.
}
ServingStatus status = 1;
}

service Health {
rpc Check(HealthCheckRequest) returns (HealthCheckResponse);

rpc Watch(HealthCheckRequest) returns (stream HealthCheckResponse);
}

使用

注册gRPC服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func ConsulGRPCRegister(address string, port int) error {
cfg := api.DefaultConfig()
// consul 服务端地址和端口
cfg.Address = fmt.Sprintf("%s:%d", address, port)

client, err := api.NewClient(cfg)
if err != nil {
return err
}

check := api.AgentServiceCheck{
// consul服务端能访问到的 grpc 服务IP和端口
GRPC: "192.168.41.68:8080",
Timeout: "5s",
Interval: "5s",
DeregisterCriticalServiceAfter: "10s",
}
registration := api.AgentServiceRegistration{
ID: "mitaka-grpc",
Name: "mitaka-grpc",
Tags: []string{"mitaka", "golang", "example", "grpc"},
Port: 8080,
Address: "127.0.0.1",
Check: &check,
}

err = client.Agent().ServiceRegister(&registration)
if err != nil {
return err
}

return nil
}

调用注册服务的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import    "google.golang.org/grpc/health"
import "google.golang.org/grpc/health/grpc_health_v1"

func main() {
log.SetFlags(log.Lshortfile)
listener, err := net.Listen("tcp", ":8080")
if err != nil {
log.Fatal(err)
}

// 如果使用认证中间件,gRPC的健康检查也需要配置,不然健康检查会不通过
//s := grpc.NewServer(grpc.UnaryInterceptor(auth))
s := grpc.NewServer()

// 注册健康检查
grpc_health_v1.RegisterHealthServer(s, health.NewServer())
err = ConsulGRPCRegister("127.0.0.1", 8500)
if err != nil {
log.Fatal(err)
}

minegrpc.RegisterGreeterServer(s, &Service{})
log.Fatal(s.Serve(listener))
}

随机获取可用端口号,可以用来定义服务端口

1
2
3
4
5
6
7
8
9
10
11
12
13
func main() {
addr, err := net.ResolveTCPAddr("tcp", ":0")
if err != nil {
log.Fatal(err)
}
listener, err := net.ListenTCP("tcp", addr)
if err != nil {
log.Fatal(err)
}
defer listener.Close()

fmt.Println(listener.Addr().(*net.TCPAddr).Port)
}

其他的接口

服务注销

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func ConsulGRPCDeregister(address string, port int, id string) error {

cfg := api.DefaultConfig()
// consul 服务端地址和端口
cfg.Address = fmt.Sprintf("%s:%d", address, port)

client, err := api.NewClient(cfg)
if err != nil {
return err
}

err = client.Agent().ServiceDeregister(id)
if err != nil {
return err
}

return nil
}

补充:

服务发现的服务端可以由多个方案实现,既可以是consol,也可以是etcd,或者nacos,所以可以将服务注册功能提到一个interface。在服务退出时,需要将服务注销,因此,在优雅退出过程中,需要将注销功能加入到退出过程中。

当没有注销,consul中依然会有已经关闭的服务的状态信息

image-20221009172102291