WEBKT

手把手教你用 gRPC 实现简易服务发现与负载均衡,微服务扩容不再愁

144 0 0 0

在微服务架构中,服务发现和负载均衡是两个至关重要的环节。服务发现让客户端能够找到可用的服务实例,而负载均衡则确保请求能够均匀地分发到这些实例上,从而提高系统的可用性和性能。gRPC 作为一种高性能的 RPC 框架,非常适合用于构建微服务。本文将介绍如何基于 gRPC 实现一个简单的服务发现和负载均衡机制,以便更好地管理和扩展服务。

1. 为什么需要服务发现和负载均衡?

想象一下,如果没有服务发现和负载均衡,你的微服务架构会变成什么样?

  • 服务地址硬编码: 客户端需要硬编码服务提供者的 IP 地址和端口。一旦服务提供者发生变更(例如 IP 地址变了,或者端口换了),客户端也需要跟着修改,这简直是噩梦!
  • 手动负载均衡: 如果有多个服务提供者实例,客户端需要自己实现负载均衡逻辑,这无疑增加了客户端的复杂性。
  • 单点故障: 如果某个服务提供者实例宕机,客户端可能会一直尝试连接这个失效的实例,导致服务不可用。

服务发现和负载均衡可以完美地解决这些问题:

  • 服务发现: 客户端通过服务发现机制,可以动态地获取可用的服务提供者实例列表,无需硬编码。
  • 负载均衡: 客户端可以使用负载均衡算法,将请求均匀地分发到不同的服务提供者实例上,提高系统的吞吐量和可用性。

2. 基于 gRPC 实现服务发现和负载均衡的思路

我们的目标是实现一个简单的服务发现和负载均衡机制,主要包括以下几个组件:

  • 服务注册中心: 用于存储服务提供者的信息,例如 IP 地址、端口等。可以使用 ZooKeeper、etcd、Consul 等作为服务注册中心。
  • 服务提供者: 启动时将自己的信息注册到服务注册中心。
  • 服务消费者: 从服务注册中心获取可用的服务提供者列表,并使用负载均衡算法选择一个实例发起调用。
  • 负载均衡器: 负责从服务注册中心获取服务列表,并根据负载均衡算法选择一个服务实例。gRPC 提供了 Client Interceptors 机制,可以方便地实现负载均衡器。

3. 动手实现:代码示例

为了简化示例,我们使用 etcd 作为服务注册中心,并实现一个简单的 Round Robin 负载均衡算法。

3.1. 定义 gRPC 服务

首先,我们需要定义一个 gRPC 服务。这里以一个简单的 Greeter 服务为例:

syntax = "proto3";

option java_multiple_files = true;
option java_package = "com.example.grpc";
option java_outer_classname = "GreeterProto";

package greeter;

service Greeter {
  rpc SayHello (HelloRequest) returns (HelloReply) {}
}

message HelloRequest {
  string name = 1;
}

message HelloReply {
  string message = 1;
}

使用 protoc 命令生成 gRPC 代码。

3.2. 服务提供者

服务提供者需要做以下几件事:

  1. 实现 gRPC 服务接口。
  2. 将自己的信息注册到 etcd。
  3. 监听关闭信号,并在关闭时从 etcd 注销。
import com.example.grpc.GreeterProto;
import greeter.GreeterGrpc;
import greeter.HelloReply;
import greeter.HelloRequest;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import io.etcd.jetcd.EtcdClient;
import io.etcd.jetcd.KV;
import io.etcd.jetcd.Lease;
import io.etcd.jetcd.lease.LeaseGrantResponse;
import io.etcd.jetcd.options.PutOption;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GreeterServer {

  private static final Logger logger = LoggerFactory.getLogger(GreeterServer.class);
  private final int port;
  private final Server server;
  private final EtcdClient etcdClient;
  private final String serviceName = "greeter";
  private final String instanceId = java.util.UUID.randomUUID().toString();
  private final String serviceAddress;
  private long leaseId;

  public GreeterServer(int port, String etcdEndpoints) {
    this.port = port;
    this.serviceAddress = "localhost:" + port;
    this.server = ServerBuilder.forPort(port).addService(new GreeterImpl()).build();
    this.etcdClient = EtcdClient.builder().endpoints(etcdEndpoints.split(",")).build();
  }

  public static void main(String[] args) throws Exception {
    int port = 8080;
    String etcdEndpoints = "http://localhost:2379";
    if (args.length > 0) {
      port = Integer.parseInt(args[0]);
    }
    if (args.length > 1) {
      etcdEndpoints = args[1];
    }
    GreeterServer server = new GreeterServer(port, etcdEndpoints);
    server.start();
    server.blockUntilShutdown();
  }

  public void start() throws IOException {
    server.start();
    logger.info("Server started, listening on " + port);
    registerService();
    Runtime.getRuntime()
        .addShutdownHook(
            new Thread(
                () -> {
                  // Use stderr here since the logger may have been reset by its JVM shutdown hook.
                  System.err.println("\n*** shutting down gRPC server since JVM is shutting down");
                  GreeterServer.this.stop();
                  System.err.println("*** server shut down");
                }));
  }

  public void stop() {
    unregisterService();
    if (server != null) {
      server.shutdown();
    }
  }

  /** Await termination on the main thread since the grpc library uses daemon threads. */
  private void blockUntilShutdown() throws InterruptedException {
    if (server != null) {
      server.awaitTermination();
    }
  }

  private void registerService() {
    Lease leaseClient = etcdClient.getLeaseClient();
    KV kvClient = etcdClient.getKVClient();

    // grant lease
    LeaseGrantResponse leaseGrantResponse = leaseClient.grant(10).join();
n    leaseId = leaseGrantResponse.getID();
    logger.info("lease id: {}", leaseId);

    // keep alive lease
    leaseClient.keepAlive(leaseId, (observer) -> {});

    // put service info
    String key = String.format("/%s/%s", serviceName, instanceId);
    String val = serviceAddress;
    PutOption putOption = PutOption.newBuilder().withLeaseId(leaseId).build();
    kvClient.put(io.etcd.jetcd.ByteSequence.from(key.getBytes()), io.etcd.jetcd.ByteSequence.from(val.getBytes()), putOption).join();
    logger.info("register service with key: {}, val: {}", key, val);
  }

  private void unregisterService() {
    KV kvClient = etcdClient.getKVClient();
    String key = String.format("/%s/%s", serviceName, instanceId);
    kvClient.delete(io.etcd.jetcd.ByteSequence.from(key.getBytes())).join();
    logger.info("unregister service with key: {}", key);

    Lease leaseClient = etcdClient.getLeaseClient();
    leaseClient.revoke(leaseId).join();
  }

  static class GreeterImpl extends GreeterGrpc.GreeterImplBase {

    @Override
    public void sayHello(HelloRequest req, StreamObserver<HelloReply> responseObserver) {
      HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + req.getName()).build();
      responseObserver.onNext(reply);
      responseObserver.onCompleted();
    }
  }
}

3.3. 服务消费者

服务消费者需要做以下几件事:

  1. 从 etcd 获取可用的服务提供者列表。
  2. 使用 Round Robin 算法选择一个实例。
  3. 使用 gRPC 客户端调用服务。
import com.example.grpc.GreeterProto;
import greeter.GreeterGrpc;
import greeter.HelloReply;
import greeter.HelloRequest;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.Client;
import io.etcd.jetcd.KV;
import io.etcd.jetcd.KeyValue;
import io.etcd.jetcd.Watch;
import io.etcd.jetcd.kv.GetResponse;
import io.etcd.jetcd.watch.WatchResponse;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GreeterClient {

  private static final Logger logger = LoggerFactory.getLogger(GreeterClient.class);
  private final String serviceName = "greeter";
  private final String etcdEndpoints;
  private List<String> serviceAddresses = new ArrayList<>();
  private int current = 0;

  public GreeterClient(String etcdEndpoints) {
    this.etcdEndpoints = etcdEndpoints;
    initServiceList();
    watchServiceList();
  }

  public static void main(String[] args) throws Exception {
    String etcdEndpoints = "http://localhost:2379";
    if (args.length > 0) {
      etcdEndpoints = args[0];
    }
    GreeterClient client = new GreeterClient(etcdEndpoints);

    for (int i = 0; i < 10; i++) {
      String message = client.sayHello("world");
      System.out.println("Response: " + message);
      Thread.sleep(1000);
    }
  }

  private void initServiceList() {
    Client client = Client.builder().endpoints(etcdEndpoints.split(",")).build();
    KV kvClient = client.getKVClient();
    ByteSequence prefix = ByteSequence.from(String.format("/%s", serviceName).getBytes());
    GetResponse response;
    try {
      response = kvClient.get(prefix).get();
      List<KeyValue> kvs = response.getKvs();
      for (KeyValue kv : kvs) {
        String address = new String(kv.getValue().getBytes());
        serviceAddresses.add(address);
        logger.info("found service: {}", address);
      }
    } catch (Exception e) {
      logger.error("failed to get service list from etcd", e);
    }
  }

  private void watchServiceList() {
    Client client = Client.builder().endpoints(etcdEndpoints.split(",")).build();
    Watch watchClient = client.getWatchClient();
    ByteSequence prefix = ByteSequence.from(String.format("/%s", serviceName).getBytes());

    CompletableFuture<Watch.Watcher> future =
        CompletableFuture.supplyAsync(
            () -> {
              Watch.Watcher watcher =
                  watchClient.watch(prefix, response -> updateServiceList(response));
              return watcher;
            });
  }

  private synchronized void updateServiceList(WatchResponse response) {
    response
        .getEvents()
        .forEach(
            event -> {
              String key = new String(event.getKeyValue().getKey().getBytes());
              String address = new String(event.getKeyValue().getValue().getBytes());
              switch (event.getType()) {
                case PUT:
                  if (!serviceAddresses.contains(address)) {
                    serviceAddresses.add(address);
                    logger.info("service added: {}", address);
                  }
                  break;
                case DELETE:
                  serviceAddresses.remove(address);
                  logger.info("service removed: {}", address);
                  break;
                default:
                  break;
              }
            });
  }

  public String sayHello(String name) {
    String address = selectServiceAddress();
    if (address == null) {
      return "No service available";
    }

    ManagedChannel channel = ManagedChannelBuilder.forTarget(address).usePlaintext().build();
    GreeterGrpc.GreeterBlockingStub stub = GreeterGrpc.newBlockingStub(channel);
    HelloRequest request = HelloRequest.newBuilder().setName(name).build();
    HelloReply response;
    try {
      response = stub.sayHello(request);
    } finally {
      channel.shutdown();
    }
    return response.getMessage();
  }

  private String selectServiceAddress() {
    if (serviceAddresses.isEmpty()) {
      return null;
    }

    synchronized (this) {
      String address = serviceAddresses.get(current % serviceAddresses.size());
      current++;
      return address;
    }
  }
}

3.4. 运行示例

  1. 启动 etcd。
  2. 启动多个 GreeterServer 实例,指定不同的端口,并指向同一个 etcd 地址。
  3. 启动 GreeterClient,指定 etcd 地址。

你可以看到客户端能够从 etcd 获取可用的服务提供者列表,并使用 Round Robin 算法将请求分发到不同的实例上。

4. 进阶:更完善的负载均衡策略

上面的示例只使用了简单的 Round Robin 算法。在实际应用中,你可能需要更完善的负载均衡策略,例如:

  • 加权 Round Robin: 根据服务提供者的性能,赋予不同的权重,让性能更好的实例处理更多的请求。
  • Least Connections: 选择当前连接数最少的实例。
  • 随机算法: 随机选择一个实例。
  • 一致性哈希: 根据请求的某个属性(例如用户 ID),将请求路由到同一个实例,适用于有状态的服务。

你可以根据自己的需求,选择合适的负载均衡算法,并将其集成到 gRPC 客户端中。

5. 总结

本文介绍了如何基于 gRPC 实现一个简单的服务发现和负载均衡机制。通过服务注册中心(例如 etcd)和服务消费者,可以动态地管理和扩展微服务。希望本文能够帮助你更好地理解 gRPC 在微服务架构中的应用。

当然,这只是一个简单的示例。在实际应用中,你还需要考虑更多因素,例如:

  • 服务健康检查: 定期检查服务提供者是否可用,并将失效的实例从服务列表中移除。
  • 容错处理: 当某个服务提供者不可用时,自动切换到其他实例。
  • 配置管理: 将服务发现和负载均衡的配置信息集中管理,方便修改和维护。

希望这些建议能够帮助你构建更健壮、更可靠的微服务架构。

码农小李 gRPC微服务服务发现

评论点评