silly.net.grpc
silly.net.grpc
silly.net.grpc 模块提供了 gRPC 协议的服务器端和客户端实现。它基于 HTTP/2 传输层,支持标准的 Protocol Buffers 序列化,实现了 Unary RPC 和 Streaming RPC 调用模式。框架自动处理连接管理、请求路由、错误处理等细节,让开发者专注于业务逻辑。
模块导入
local grpc = require "silly.net.grpc"核心概念
gRPC 协议
gRPC 是 Google 开源的高性能 RPC 框架,基于 HTTP/2 和 Protocol Buffers:
- HTTP/2 传输: 利用多路复用、头部压缩等特性提升性能
- Protocol Buffers: 高效的二进制序列化格式
- 强类型接口: 通过 .proto 文件定义清晰的服务契约
- 流式支持: 支持客户端流、服务器流、双向流
RPC 调用模式
框架支持四种 gRPC 调用模式:
- Unary RPC: 单请求单响应(最常用)
- Server Streaming RPC: 单请求多响应流
- Client Streaming RPC: 多请求流单响应
- Bidirectional Streaming RPC: 双向流式通信
Protocol Buffers
使用 protoc 模块定义和加载服务接口:
local protoc = require "protoc"
local p = protoc:new()
p:load([[
syntax = "proto3";
package mypackage;
message Request {
string name = 1;
int32 value = 2;
}
message Response {
string result = 1;
}
service MyService {
rpc MyMethod (Request) returns (Response) {}
}
]], "myservice.proto")状态码
gRPC 使用标准的状态码表示操作结果,定义在 silly.net.grpc.code 模块:
code.OK(0): 成功code.Canceled(1): 操作被取消code.Unknown(2): 未知错误code.InvalidArgument(3): 无效参数code.DeadlineExceeded(4): 超时code.NotFound(5): 资源未找到code.AlreadyExists(6): 资源已存在code.PermissionDenied(7): 权限不足code.ResourceExhausted(8): 资源耗尽code.Unauthenticated(16): 未认证
服务器端 API
registrar.new()
创建一个新的 gRPC 服务注册器。
- 参数: 无
- 返回值:
registrar- 服务注册器对象 - 示例:
local registrar = require "silly.net.grpc.registrar"
local reg = registrar.new()registrar:register(proto, service)
向注册器注册服务实现。
- 参数:
proto:table- protoc 加载的 proto 定义(包含 package 和 service 信息)service:table- 服务实现表,键为方法名,值为处理函数
- 返回值: 无
- 注意: 处理函数签名为
function(request) -> response - 示例:
local protoc = require "protoc"
local registrar = require "silly.net.grpc.registrar"
local p = protoc:new()
p:load([[
syntax = "proto3";
package hello;
message HelloRequest {
string name = 1;
}
message HelloResponse {
string message = 1;
}
service Greeter {
rpc SayHello (HelloRequest) returns (HelloResponse) {}
}
]], "hello.proto")
local reg = registrar.new()
reg:register(p.loaded["hello.proto"], {
SayHello = function(request)
return {
message = "Hello, " .. request.name
}
end
})grpc.listen(conf)
创建 gRPC 服务器并开始监听。
- 参数:
conf:table- 服务器配置表addr:string(必需) - 监听地址,例如"127.0.0.1:50051"registrar:registrar(必需) - 服务注册器对象tls:boolean|nil(可选) - 是否启用 TLS,默认 falsecerts:table[]|nil(可选) - TLS 证书配置(tls=true 时必需)cert:string- PEM 格式证书key:string- PEM 格式私钥
alpnprotos:string[]|nil(可选) - ALPN 协议列表,默认{"h2"}ciphers:string|nil(可选) - TLS 密码套件配置backlog:integer|nil(可选) - 监听队列大小
- 返回值:
- 成功:
server- 服务器对象 - 失败:
nil, string- nil 和错误信息
- 成功:
- 示例:
local silly = require "silly"
local grpc = require "silly.net.grpc"
local protoc = require "protoc"
local registrar = require "silly.net.grpc.registrar"
local task = require "silly.task"
task.fork(function()
local p = protoc:new()
p:load([[
syntax = "proto3";
package calculator;
message AddRequest {
int32 a = 1;
int32 b = 2;
}
message AddResponse {
int32 result = 1;
}
service Calculator {
rpc Add (AddRequest) returns (AddResponse) {}
}
]], "calc.proto")
local reg = registrar.new()
reg:register(p.loaded["calc.proto"], {
Add = function(req)
return {result = req.a + req.b}
end
})
local server, err = grpc.listen {
addr = "127.0.0.1:50051",
registrar = reg,
}
if not server then
print("Failed to start server:", err)
return
end
print("gRPC server listening on 127.0.0.1:50051")
end)server:close()
关闭 gRPC 服务器。
- 参数: 无
- 返回值:
- 成功:
true - 失败:
false, string- false 和错误信息
- 成功:
- 示例:
local silly = require "silly"
local grpc = require "silly.net.grpc"
local registrar = require "silly.net.grpc.registrar"
local protoc = require "protoc"
local task = require "silly.task"
task.fork(function()
local p = protoc:new()
p:load([[
syntax = "proto3";
package test;
message Empty {}
service Test {
rpc Ping (Empty) returns (Empty) {}
}
]], "test.proto")
local reg = registrar.new()
reg:register(p.loaded["test.proto"], {
Ping = function() return {} end
})
local server = grpc.listen {
addr = ":50051",
registrar = reg,
}
-- 稍后关闭服务器
local ok, err = server:close()
if ok then
print("Server closed")
else
print("Close failed:", err)
end
end)客户端 API
grpc.newclient(conf)
创建 gRPC 客户端。
- 参数:
conf:table- 客户端配置表service:string(必需) - 服务名称(对应 proto 中的 service 名)endpoints:string[](必需) - gRPC 服务器地址列表,格式"host:port"proto:table(必需) - protoc 加载的 proto 定义tls:boolean|nil(可选) - 是否使用 TLS,默认 falsetimeout:number|nil(可选) - 请求超时时间(毫秒)
- 返回值:
- 成功:
client- 客户端对象 - 失败:
nil, string- nil 和错误信息
- 成功:
- 注意: 客户端对象会动态生成方法,方法名对应 proto 中定义的 RPC 方法
- 负载均衡: 多个 endpoint 时使用轮询策略
- 示例:
local silly = require "silly"
local grpc = require "silly.net.grpc"
local protoc = require "protoc"
local task = require "silly.task"
task.fork(function()
local p = protoc:new()
p:load([[
syntax = "proto3";
package hello;
message HelloRequest {
string name = 1;
}
message HelloResponse {
string message = 1;
}
service Greeter {
rpc SayHello (HelloRequest) returns (HelloResponse) {}
}
]], "hello.proto")
local client, err = grpc.newclient {
service = "Greeter",
endpoints = {"127.0.0.1:50051", "127.0.0.1:50052"},
proto = p.loaded["hello.proto"],
timeout = 5000,
}
if not client then
print("Failed to create client:", err)
return
end
-- 客户端对象会自动生成 SayHello 方法
local response, err = client.SayHello({name = "World"})
if response then
print("Response:", response.message)
else
print("RPC failed:", err)
end
end)client.MethodName(request)
调用 RPC 方法(Unary RPC)。
- 参数:
request:table- 请求消息对象(对应 proto 中定义的请求类型)
- 返回值:
- 成功:
table- 响应消息对象 - 失败:
nil, string- nil 和错误信息
- 成功:
- 异步: 会挂起协程直到收到响应或超时
- 注意: 方法名由 proto 定义决定,不同的 service 有不同的方法
- 示例:
local silly = require "silly"
local grpc = require "silly.net.grpc"
local protoc = require "protoc"
local task = require "silly.task"
task.fork(function()
local p = protoc:new()
p:load([[
syntax = "proto3";
package math;
message MulRequest {
int32 x = 1;
int32 y = 2;
}
message MulResponse {
int32 product = 1;
}
service MathService {
rpc Multiply (MulRequest) returns (MulResponse) {}
}
]], "math.proto")
local client = grpc.newclient {
service = "MathService",
endpoints = {"127.0.0.1:50051"},
proto = p.loaded["math.proto"],
}
-- 调用 Multiply 方法
local result, err = client.Multiply({x = 6, y = 7})
if result then
print("6 * 7 =", result.product)
else
print("RPC error:", err)
end
end)client.StreamMethod()
创建流式 RPC 连接(Streaming RPC)。
- 参数: 无
- 返回值:
- 成功:
stream- 流对象 - 失败:
nil, string- nil 和错误信息
- 成功:
- 异步: 会挂起协程直到连接建立
- 注意: 仅当 proto 中定义的方法包含 stream 关键字时可用
- 示例:
local silly = require "silly"
local grpc = require "silly.net.grpc"
local protoc = require "protoc"
local task = require "silly.task"
task.fork(function()
local p = protoc:new()
p:load([[
syntax = "proto3";
package stream;
message DataChunk {
string data = 1;
}
service StreamService {
rpc BiStream (stream DataChunk) returns (stream DataChunk) {}
}
]], "stream.proto")
local client = grpc.newclient {
service = "StreamService",
endpoints = {"127.0.0.1:50051"},
proto = p.loaded["stream.proto"],
}
-- 创建双向流
local stream, err = client.BiStream()
if not stream then
print("Failed to create stream:", err)
return
end
-- 发送数据
stream:write({data = "Hello"})
-- 读取响应
local response, err = stream:read()
if response then
print("Received:", response.data)
end
stream:close()
end)流式 RPC API
当 RPC 方法定义为流式时(使用 stream 关键字),客户端调用返回流对象:
stream:write(request)
向流写入请求消息。
- 参数:
request:table- 请求消息对象
- 返回值:
- 成功:
true - 失败:
false, string- false 和错误信息
- 成功:
- 异步: 会挂起协程直到数据发送完成
- 适用: Client Streaming 和 Bidirectional Streaming
- 示例:
local silly = require "silly"
local grpc = require "silly.net.grpc"
local protoc = require "protoc"
local task = require "silly.task"
task.fork(function()
local p = protoc:new()
p:load([[
syntax = "proto3";
package upload;
message FileChunk {
bytes data = 1;
}
message UploadResponse {
string file_id = 1;
}
service FileService {
rpc Upload (stream FileChunk) returns (UploadResponse) {}
}
]], "file.proto")
local client = grpc.newclient {
service = "FileService",
endpoints = {"127.0.0.1:50051"},
proto = p.loaded["file.proto"],
}
local stream = client.Upload()
-- 分块上传文件
for i = 1, 5 do
local ok, err = stream:write({
data = string.rep("X", 1024)
})
if not ok then
print("Write failed:", err)
break
end
end
stream:close()
end)stream:read([timeout])
从流读取响应消息。
- 参数:
timeout:number|nil(可选) - 读取超时时间(毫秒)
- 返回值:
- 成功:
table- 响应消息对象 - 失败:
nil, string- nil 和错误信息
- 成功:
- 异步: 会挂起协程直到收到数据或超时
- 适用: Server Streaming 和 Bidirectional Streaming
- 示例:
local silly = require "silly"
local grpc = require "silly.net.grpc"
local protoc = require "protoc"
local task = require "silly.task"
task.fork(function()
local p = protoc:new()
p:load([[
syntax = "proto3";
package log;
message LogRequest {
string query = 1;
}
message LogEntry {
string timestamp = 1;
string message = 2;
}
service LogService {
rpc StreamLogs (LogRequest) returns (stream LogEntry) {}
}
]], "log.proto")
local client = grpc.newclient {
service = "LogService",
endpoints = {"127.0.0.1:50051"},
proto = p.loaded["log.proto"],
}
local stream = client.StreamLogs()
-- 发送查询请求
stream:write({query = "error"})
-- 持续读取日志流
while true do
local entry, err = stream:read(10000) -- 10秒超时
if not entry then
print("Stream ended:", err)
break
end
print(entry.timestamp, entry.message)
end
stream:close()
end)stream:close()
关闭流连接。
- 参数: 无
- 返回值: 无
- 注意: 关闭后流对象不可再使用
- 示例:
local silly = require "silly"
local grpc = require "silly.net.grpc"
local protoc = require "protoc"
local task = require "silly.task"
task.fork(function()
local p = protoc:new()
p:load([[
syntax = "proto3";
package chat;
message Message {
string text = 1;
}
service ChatService {
rpc Chat (stream Message) returns (stream Message) {}
}
]], "chat.proto")
local client = grpc.newclient {
service = "ChatService",
endpoints = {"127.0.0.1:50051"},
proto = p.loaded["chat.proto"],
}
local stream = client.Chat()
-- 发送消息
stream:write({text = "Hello"})
-- 读取回复
local reply = stream:read()
if reply then
print("Reply:", reply.text)
end
-- 关闭连接
stream:close()
end)使用示例
示例1:基础 Unary RPC
实现一个简单的问候服务:
local silly = require "silly"
local grpc = require "silly.net.grpc"
local protoc = require "protoc"
local registrar = require "silly.net.grpc.registrar"
local task = require "silly.task"
task.fork(function()
-- 定义 proto
local p = protoc:new()
p:load([[
syntax = "proto3";
package greeter;
message HelloRequest {
string name = 1;
}
message HelloResponse {
string greeting = 1;
}
service Greeter {
rpc SayHello (HelloRequest) returns (HelloResponse) {}
rpc SayGoodbye (HelloRequest) returns (HelloResponse) {}
}
]], "greeter.proto")
-- 创建服务器
local reg = registrar.new()
reg:register(p.loaded["greeter.proto"], {
SayHello = function(req)
return {greeting = "Hello, " .. req.name .. "!"}
end,
SayGoodbye = function(req)
return {greeting = "Goodbye, " .. req.name .. "!"}
end
})
local server = grpc.listen {
addr = "127.0.0.1:50051",
registrar = reg,
}
print("Greeter server started on 127.0.0.1:50051")
-- 创建客户端
local client = grpc.newclient {
service = "Greeter",
endpoints = {"127.0.0.1:50051"},
proto = p.loaded["greeter.proto"],
timeout = 5000,
}
-- 调用 RPC
local resp1 = client.SayHello({name = "Alice"})
print(resp1.greeting) -- Hello, Alice!
local resp2 = client.SayGoodbye({name = "Bob"})
print(resp2.greeting) -- Goodbye, Bob!
server:close()
end)示例2:带错误处理的 RPC 调用
完整的错误处理示例:
local silly = require "silly"
local grpc = require "silly.net.grpc"
local protoc = require "protoc"
local registrar = require "silly.net.grpc.registrar"
local task = require "silly.task"
task.fork(function()
local p = protoc:new()
p:load([[
syntax = "proto3";
package calculator;
message DivideRequest {
double dividend = 1;
double divisor = 2;
}
message DivideResponse {
double quotient = 1;
}
service Calculator {
rpc Divide (DivideRequest) returns (DivideResponse) {}
}
]], "calculator.proto")
-- 服务器端
local reg = registrar.new()
reg:register(p.loaded["calculator.proto"], {
Divide = function(req)
if req.divisor == 0 then
-- 服务端可以返回 nil 或抛出错误
error("division by zero")
end
return {quotient = req.dividend / req.divisor}
end
})
local server = grpc.listen {
addr = "127.0.0.1:50051",
registrar = reg,
}
-- 客户端
local client = grpc.newclient {
service = "Calculator",
endpoints = {"127.0.0.1:50051"},
proto = p.loaded["calculator.proto"],
timeout = 5000,
}
-- 正常调用
local result, err = client.Divide({
dividend = 10,
divisor = 2
})
if result then
print("10 / 2 =", result.quotient)
else
print("RPC failed:", err)
end
-- 错误调用(除零)
local result2, err2 = client.Divide({
dividend = 10,
divisor = 0
})
if result2 then
print("Result:", result2.quotient)
else
print("Expected error:", err2)
end
server:close()
end)示例3:多服务端点负载均衡
使用多个服务器端点实现负载均衡:
local silly = require "silly"
local grpc = require "silly.net.grpc"
local protoc = require "protoc"
local registrar = require "silly.net.grpc.registrar"
local task = require "silly.task"
task.fork(function()
local p = protoc:new()
p:load([[
syntax = "proto3";
package counter;
message IncrementRequest {
int32 delta = 1;
}
message IncrementResponse {
int32 value = 1;
}
service Counter {
rpc Increment (IncrementRequest) returns (IncrementResponse) {}
}
]], "counter.proto")
-- 启动两个服务器实例
local reg1 = registrar.new()
local counter1 = 0
reg1:register(p.loaded["counter.proto"], {
Increment = function(req)
counter1 = counter1 + req.delta
print("Server 1 counter:", counter1)
return {value = counter1}
end
})
local server1 = grpc.listen {
addr = "127.0.0.1:50051",
registrar = reg1,
}
local reg2 = registrar.new()
local counter2 = 0
reg2:register(p.loaded["counter.proto"], {
Increment = function(req)
counter2 = counter2 + req.delta
print("Server 2 counter:", counter2)
return {value = counter2}
end
})
local server2 = grpc.listen {
addr = "127.0.0.1:50052",
registrar = reg2,
}
print("Started two servers on ports 50051 and 50052")
-- 客户端连接多个端点
local client = grpc.newclient {
service = "Counter",
endpoints = {
"127.0.0.1:50051",
"127.0.0.1:50052"
},
proto = p.loaded["counter.proto"],
}
-- 请求会轮询到两个服务器
for i = 1, 4 do
local result = client.Increment({delta = 1})
print("Client got value:", result.value)
end
server1:close()
server2:close()
end)示例4:带超时的 RPC 调用
演示超时控制:
local silly = require "silly"
local time = require "silly.time"
local grpc = require "silly.net.grpc"
local protoc = require "protoc"
local registrar = require "silly.net.grpc.registrar"
local task = require "silly.task"
task.fork(function()
local p = protoc:new()
p:load([[
syntax = "proto3";
package slow;
message SlowRequest {
int32 delay_ms = 1;
}
message SlowResponse {
string result = 1;
}
service SlowService {
rpc SlowMethod (SlowRequest) returns (SlowResponse) {}
}
]], "slow.proto")
-- 服务器:模拟慢响应
local reg = registrar.new()
reg:register(p.loaded["slow.proto"], {
SlowMethod = function(req)
time.sleep(req.delay_ms)
return {result = "Done after " .. req.delay_ms .. "ms"}
end
})
local server = grpc.listen {
addr = "127.0.0.1:50051",
registrar = reg,
}
-- 客户端:设置短超时
local client = grpc.newclient {
service = "SlowService",
endpoints = {"127.0.0.1:50051"},
proto = p.loaded["slow.proto"],
timeout = 1000, -- 1秒超时
}
-- 快速请求(应该成功)
local result1, err1 = client.SlowMethod({delay_ms = 100})
if result1 then
print("Fast request:", result1.result)
else
print("Fast request failed:", err1)
end
-- 慢速请求(应该超时)
local result2, err2 = client.SlowMethod({delay_ms = 2000})
if result2 then
print("Slow request:", result2.result)
else
print("Slow request timeout:", err2)
end
server:close()
end)示例5:TLS 加密的 gRPC 服务
使用 TLS 保护 gRPC 通信:
local silly = require "silly"
local grpc = require "silly.net.grpc"
local protoc = require "protoc"
local registrar = require "silly.net.grpc.registrar"
local task = require "silly.task"
task.fork(function()
local p = protoc:new()
p:load([[
syntax = "proto3";
package secure;
message SecureRequest {
string secret = 1;
}
message SecureResponse {
string result = 1;
}
service SecureService {
rpc ProcessSecret (SecureRequest) returns (SecureResponse) {}
}
]], "secure.proto")
-- 测试用的自签名证书
local cert_pem = [[-----BEGIN CERTIFICATE-----
MIIDCTCCAfGgAwIBAgIUPc2faaWEjGh1RklF9XPAgYS5WSMwDQYJKoZIhvcNAQEL
BQAwFDESMBAGA1UEAwwJbG9jYWxob3N0MB4XDTI1MTAwOTA5NDc1M1oXDTM1MTAw
NzA5NDc1M1owFDESMBAGA1UEAwwJbG9jYWxob3N0MIIBIjANBgkqhkiG9w0BAQEF
AAOCAQ8AMIIBCgKCAQEApmUl+7J8zeWdOH6aiNwRSOcFePTxuAyYsAEewVtBCAEv
LVGxQtrsVvd6UosEd0aO/Qz3hvV32wYzI0ZzjGGfy0lCCx9YB05SyYY+KpDwe/os
Mf4RtBS/jN1dVX7TiRQ3KsngMFSXp2aC6IpI5ngF0PS/o2qbwkU19FCELE6G5WnA
fniUaf7XEwrhAkMAczJovqOu4BAhBColr7cQK7CQK6VNEhQBzM/N/hGmIniPbC7k
TjqyohWoLGPT+xQAe8WB39zbIHl+xEDoGAYaaI8I7TlcQWwCOIxdm+w67CQmC/Fy
GTX5fPoK96drushzwvAKphQrpQwT5MxTDvoE9xgbhQIDAQABo1MwUTAdBgNVHQ4E
FgQUsjX1LC+0rS4Ls5lcE8yg5P85LqQwHwYDVR0jBBgwFoAUsjX1LC+0rS4Ls5lc
E8yg5P85LqQwDwYDVR0TAQH/BAUwAwEB/zANBgkqhkiG9w0BAQsFAAOCAQEADqDJ
HQxRjFPSxIk5EMrxkqxE30LoWKJeW9vqublQU/qHfMo7dVTwfsAvFpTJfL7Zhhqw
l20ijbQVxPtDwPB8alQ/ScP5VRqC2032KTi9CqUqTj+y58oDxgjnm06vr5d8Xkmm
nR2xhUecGkzFYlDoXo1w8XttMUefyHS6HWLXvu94V7Y/8YB4lBCEnwFnhgkYB9CG
RsleiOiZDsaHhnNQsnM+Xl1UJVxJlMStl+Av2rCTAj/LMHniXQ+9QKI/7pNDUeCL
qSdxZephYkeRF8C/i9R5G/gAL40kUFz0sgyXuv/kss3rrxsshKKTRbxnRm1k/J73
9ZiztVOeqpcxFxmf7Q==
-----END CERTIFICATE-----
]]
local key_pem = [[-----BEGIN PRIVATE KEY-----
MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQCmZSX7snzN5Z04
fpqI3BFI5wV49PG4DJiwAR7BW0EIAS8tUbFC2uxW93pSiwR3Ro79DPeG9XfbBjMj
RnOMYZ/LSUILH1gHTlLJhj4qkPB7+iwx/hG0FL+M3V1VftOJFDcqyeAwVJenZoLo
ikjmeAXQ9L+japvCRTX0UIQsToblacB+eJRp/tcTCuECQwBzMmi+o67gECEEKiWv
txArsJArpU0SFAHMz83+EaYieI9sLuROOrKiFagsY9P7FAB7xYHf3NsgeX7EQOgY
BhpojwjtOVxBbAI4jF2b7DrsJCYL8XIZNfl8+gr3p2u6yHPC8AqmFCulDBPkzFMO
+gT3GBuFAgMBAAECggEAD5uyVetWuKuetVNu5IKcHnYJNeDoIacQ1YWtYF7SeVE/
HyWoFojZnYjGUSLYLuYP+J20RFUXQpTQzDDKGvN3XUbIaqmshLbsnhm5EB4baM29
Qo0+FOHTW//RxvjIF/Ys/JcGMBJnTV0Yz35VO0Ur6n9i0I3qAW2jk4DP/SX6kl9T
4iJj2Y+69y0bHjesfO71nCUUH6Ym2CHJRd6A4tCeYQr3U/CXOWggpUuPTXFWptt7
uSJjbTQgwUF5H83ih1CUdto1G5LPBUXVD5x2XZshgwZsL1au9kH2l/83BAHKK8io
LQ8FekLN6FLD83mvEwFPyrVhfipbeUz3bKrgEzvOmwKBgQDUbrAgRYCLxxpmguiN
0aPV85xc+VPL+dh865QHhJ0pH/f3fah/U7van/ayfG45aIA+DI7qohGzf03xFnO4
O51RHcRhnjDbXWY5l0ZpOIpvHLLCm8gqIAkX9bt7UyE+PxRSNvUt3kVFT3ZYnYCx
Wb1kiV1oRAzTf1l0X0qamFPqdwKBgQDIhV8OWTBrsuC0U3hmvNB+DPEHnyPWBHvI
+HMflas5gJiZ+3KvrS3vBOXFB3qfTD1LQwUPqeqY0Q41Svvsq2IQAkKedJDdMuPU
RoKaV/Qln85nmibscNcwVGQNUKTeSCJQ43ktrWT01UinamsSEOYTceMqwW10LDaF
Ff1MbKNs4wKBgQDMEPiIR7vQipdF2oNjmPt1z+tpNOnWjE/20KcHAdGna9pcmQ2A
IwPWZMwrcXTBGS34bT/tDXtLnwNUkWjglgPtpFa+H6R3ViWZNUSiV3pEeqEOaW/D
Z7rUlW5gbd8FWLtAryKfyWFpz4e0YLj7pWVWas6cFqLrmO5p6BBWqfYSyQKBgHyp
rjcVa+0JAHobircUm+pB0XeTkIv1rZ98FtaEDjdpo3XXxa1CVVRMDy03QRzYISMx
P2xFjvwCvHqVa5nv0r9xKEmq3oUmpk3KqFecZsUdXQ074QcOADqjvLAqetVWsz7m
rOeg7SrpjonGt1o7904Pd9OU/Z9D/YEv8pIY2GFRAoGASEf3+igRFSECUxLh9LZC
scAxCHh9sz15swDD/rdtEqLKGcxlu74YKkBnyQ/yWA4d/enPnvdP98ThXdXnX0X4
v1HSCliKZXW8cusnBRD2IOyxuIUV/qiMfARylMvlLBccgJR8+olH9f/yF2EFWhoy
125zQzr/ESlTL+5IWeNf2sM=
-----END PRIVATE KEY-----
]]
-- TLS 服务器
local reg = registrar.new()
reg:register(p.loaded["secure.proto"], {
ProcessSecret = function(req)
return {result = "Processed: " .. req.secret}
end
})
local server = grpc.listen {
addr = "127.0.0.1:50051",
registrar = reg,
tls = true,
alpnprotos = {"h2"},
certs = {
{
cert = cert_pem,
key = key_pem,
}
},
}
print("Secure gRPC server started")
-- TLS 客户端
local client = grpc.newclient {
service = "SecureService",
endpoints = {"127.0.0.1:50051"},
proto = p.loaded["secure.proto"],
tls = true,
}
local result = client.ProcessSecret({
secret = "my-sensitive-data"
})
if result then
print("Secure response:", result.result)
end
server:close()
end)示例6:并发 RPC 调用
使用协程实现并发请求:
local silly = require "silly"
local grpc = require "silly.net.grpc"
local protoc = require "protoc"
local registrar = require "silly.net.grpc.registrar"
local waitgroup = require "silly.sync.waitgroup"
local task = require "silly.task"
task.fork(function()
local p = protoc:new()
p:load([[
syntax = "proto3";
package api;
message GetRequest {
int32 id = 1;
}
message GetResponse {
int32 id = 1;
string data = 2;
}
service DataService {
rpc GetData (GetRequest) returns (GetResponse) {}
}
]], "api.proto")
-- 服务器
local reg = registrar.new()
reg:register(p.loaded["api.proto"], {
GetData = function(req)
return {
id = req.id,
data = "Data for ID " .. req.id
}
end
})
local server = grpc.listen {
addr = "127.0.0.1:50051",
registrar = reg,
}
-- 客户端
local client = grpc.newclient {
service = "DataService",
endpoints = {"127.0.0.1:50051"},
proto = p.loaded["api.proto"],
}
-- 并发请求多个资源
local wg = waitgroup.new()
local results = {}
for i = 1, 10 do
wg:fork(function()
local result = client.GetData({id = i})
if result then
results[i] = result.data
print("Received:", result.data)
end
end)
end
wg:wait()
print("All requests completed, got", #results, "results")
server:close()
end)示例7:服务器流式 RPC
演示服务器流式响应:
local silly = require "silly"
local grpc = require "silly.net.grpc"
local protoc = require "protoc"
local registrar = require "silly.net.grpc.registrar"
local task = require "silly.task"
task.fork(function()
local p = protoc:new()
p:load([[
syntax = "proto3";
package events;
message EventRequest {
string topic = 1;
}
message Event {
int32 sequence = 1;
string message = 2;
}
service EventService {
rpc Subscribe (EventRequest) returns (stream Event) {}
}
]], "events.proto")
-- 注意:当前实现主要支持 Unary RPC
-- 完整的流式 RPC 需要服务器端的额外实现
-- 这里展示客户端如何处理流式响应
local reg = registrar.new()
reg:register(p.loaded["events.proto"], {
Subscribe = function(req)
-- Unary 版本:返回单个事件
return {
sequence = 1,
message = "Event for topic: " .. req.topic
}
end
})
local server = grpc.listen {
addr = "127.0.0.1:50051",
registrar = reg,
}
local client = grpc.newclient {
service = "EventService",
endpoints = {"127.0.0.1:50051"},
proto = p.loaded["events.proto"],
}
-- 对于流式方法,调用返回流对象
local stream, err = client.Subscribe()
if stream then
-- 写入请求
stream:write({topic = "notifications"})
-- 读取流式响应
local event = stream:read(5000)
if event then
print("Event:", event.sequence, event.message)
end
stream:close()
end
server:close()
end)示例8:复杂数据结构的 RPC
处理嵌套和复杂的 Protocol Buffers 消息:
local silly = require "silly"
local grpc = require "silly.net.grpc"
local protoc = require "protoc"
local registrar = require "silly.net.grpc.registrar"
local task = require "silly.task"
task.fork(function()
local p = protoc:new()
p:load([[
syntax = "proto3";
package user;
message Address {
string street = 1;
string city = 2;
string country = 3;
}
message User {
int32 id = 1;
string name = 2;
repeated string emails = 3;
Address address = 4;
map<string, string> metadata = 5;
}
message CreateUserRequest {
User user = 1;
}
message CreateUserResponse {
User user = 1;
bool success = 2;
}
service UserService {
rpc CreateUser (CreateUserRequest) returns (CreateUserResponse) {}
}
]], "user.proto")
-- 服务器
local reg = registrar.new()
local next_id = 1
reg:register(p.loaded["user.proto"], {
CreateUser = function(req)
local user = req.user
user.id = next_id
next_id = next_id + 1
print("Created user:", user.name)
print("Address:", user.address.city, user.address.country)
print("Emails:", table.concat(user.emails, ", "))
return {
user = user,
success = true
}
end
})
local server = grpc.listen {
addr = "127.0.0.1:50051",
registrar = reg,
}
-- 客户端
local client = grpc.newclient {
service = "UserService",
endpoints = {"127.0.0.1:50051"},
proto = p.loaded["user.proto"],
}
local response = client.CreateUser({
user = {
name = "Alice Smith",
emails = {
"alice@example.com",
"asmith@work.com"
},
address = {
street = "123 Main St",
city = "Springfield",
country = "USA"
},
metadata = {
department = "Engineering",
level = "Senior"
}
}
})
if response and response.success then
print("User created with ID:", response.user.id)
end
server:close()
end)注意事项
1. 协程要求
所有 gRPC API(服务器和客户端)必须在协程中调用:
local silly = require "silly"
local grpc = require "silly.net.grpc"
local task = require "silly.task"
-- 错误:不能在主线程调用
-- local client = grpc.newclient(...) -- 会失败
-- 正确:在协程中调用
task.fork(function()
local client = grpc.newclient({
-- ...
})
-- 正常使用
end)2. HTTP/2 依赖
gRPC 依赖 HTTP/2 传输层:
- 服务器自动使用 HTTP/2
- TLS 场景使用 ALPN 协商
h2协议 - 客户端会自动检测并使用 HTTP/2
3. Protocol Buffers 版本
使用 proto3 语法(推荐):
local protoc = require "protoc"
local p = protoc:new()
-- 推荐:使用 proto3
p:load([[
syntax = "proto3";
package myapp;
-- ...
]], "myapp.proto")4. 错误处理
始终检查返回值,gRPC 调用可能失败:
local silly = require "silly"
local grpc = require "silly.net.grpc"
local protoc = require "protoc"
local task = require "silly.task"
task.fork(function()
local p = protoc:new()
p:load([[
syntax = "proto3";
package test;
message Req {}
message Resp {}
service Test {
rpc Call (Req) returns (Resp) {}
}
]], "test.proto")
local client, err = grpc.newclient {
service = "Test",
endpoints = {"127.0.0.1:50051"},
proto = p.loaded["test.proto"],
}
if not client then
print("Failed to create client:", err)
return
end
local response, err = client.Call({})
if not response then
print("RPC call failed:", err)
return
end
-- 处理成功响应
end)5. 消息大小限制
服务器端限制单个消息大小为 4MB:
-- grpc.lua 中的常量
local MAX_LEN = 4*1024*1024 -- 4MB超过限制会返回 ResourceExhausted 错误。
6. 超时设置
客户端超时是可选的,建议设置合理的超时值:
local silly = require "silly"
local grpc = require "silly.net.grpc"
local protoc = require "protoc"
local task = require "silly.task"
task.fork(function()
local p = protoc:new()
p:load([[
syntax = "proto3";
package test;
message Empty {}
service Test {
rpc Ping (Empty) returns (Empty) {}
}
]], "test.proto")
local client = grpc.newclient {
service = "Test",
endpoints = {"127.0.0.1:50051"},
proto = p.loaded["test.proto"],
timeout = 5000, -- 5秒超时(推荐设置)
}
-- 超时后返回 nil 和错误信息
local result, err = client.Ping({})
if not result then
print("Request timeout or failed:", err)
end
end)7. 服务注册顺序
必须先注册服务,再启动服务器:
local silly = require "silly"
local grpc = require "silly.net.grpc"
local protoc = require "protoc"
local registrar = require "silly.net.grpc.registrar"
local task = require "silly.task"
task.fork(function()
local p = protoc:new()
p:load([[
syntax = "proto3";
package test;
message Empty {}
service Test {
rpc Ping (Empty) returns (Empty) {}
}
]], "test.proto")
local reg = registrar.new()
-- 1. 先注册服务
reg:register(p.loaded["test.proto"], {
Ping = function() return {} end
})
-- 2. 再启动服务器
local server = grpc.listen {
addr = ":50051",
registrar = reg,
}
server:close()
end)8. 负载均衡策略
客户端使用简单的轮询(Round-Robin)策略:
local silly = require "silly"
local grpc = require "silly.net.grpc"
local protoc = require "protoc"
local task = require "silly.task"
task.fork(function()
local p = protoc:new()
p:load([[
syntax = "proto3";
package test;
message Empty {}
service Test {
rpc Ping (Empty) returns (Empty) {}
}
]], "test.proto")
-- 请求会按顺序分发到三个端点
local client = grpc.newclient {
service = "Test",
endpoints = {
"server1.example.com:50051", -- 第1个请求
"server2.example.com:50051", -- 第2个请求
"server3.example.com:50051", -- 第3个请求
-- 第4个请求回到 server1...
},
proto = p.loaded["test.proto"],
}
-- 10 个请求会均匀分布到 3 个服务器
for i = 1, 10 do
client.Ping({})
end
end)性能建议
1. 连接复用
gRPC 基于 HTTP/2,自动复用连接:
local silly = require "silly"
local grpc = require "silly.net.grpc"
local protoc = require "protoc"
local task = require "silly.task"
task.fork(function()
local p = protoc:new()
p:load([[
syntax = "proto3";
package test;
message Request {
int32 id = 1;
}
message Response {
string data = 1;
}
service Test {
rpc Get (Request) returns (Response) {}
}
]], "test.proto")
local client = grpc.newclient {
service = "Test",
endpoints = {"127.0.0.1:50051"},
proto = p.loaded["test.proto"],
}
-- HTTP/2 连接会被自动复用
for i = 1, 100 do
client.Get({id = i})
end
-- 所有请求共享同一个 TCP 连接
end)2. 并发调用
使用 waitgroup 实现并发 RPC 请求:
local silly = require "silly"
local grpc = require "silly.net.grpc"
local protoc = require "protoc"
local waitgroup = require "silly.sync.waitgroup"
local task = require "silly.task"
task.fork(function()
local p = protoc:new()
p:load([[
syntax = "proto3";
package test;
message Request {
int32 id = 1;
}
message Response {
string data = 1;
}
service Test {
rpc Get (Request) returns (Response) {}
}
]], "test.proto")
local client = grpc.newclient {
service = "Test",
endpoints = {"127.0.0.1:50051"},
proto = p.loaded["test.proto"],
}
local wg = waitgroup.new()
-- 并发发送 20 个请求
for i = 1, 20 do
wg:fork(function()
local result = client.Get({id = i})
if result then
print("Got result for ID", i)
end
end)
end
wg:wait()
print("All concurrent requests completed")
end)3. 消息设计
优化 Protocol Buffers 消息结构:
- 使用合适的字段类型(int32 vs int64)
- 避免过大的消息(受 4MB 限制)
- 使用 repeated 和 map 提高表达能力
- 避免深度嵌套
local protoc = require "protoc"
local p = protoc:new()
p:load([[
syntax = "proto3";
package optimize;
// 好的设计:字段类型合适,结构清晰
message GoodRequest {
int32 user_id = 1; // 用户ID通常不需要 int64
repeated string tags = 2; // 使用 repeated 而不是分隔的字符串
map<string, int32> counts = 3; // 使用 map 而不是列表
}
// 避免:过深的嵌套
message BadRequest {
message Level1 {
message Level2 {
message Level3 {
string data = 1; // 3层嵌套,不推荐
}
Level3 level3 = 1;
}
Level2 level2 = 1;
}
Level1 level1 = 1;
}
]], "optimize.proto")4. 服务器端池化
对于高负载场景,启动多个服务器实例:
local silly = require "silly"
local grpc = require "silly.net.grpc"
local protoc = require "protoc"
local registrar = require "silly.net.grpc.registrar"
local task = require "silly.task"
task.fork(function()
local p = protoc:new()
p:load([[
syntax = "proto3";
package test;
message Empty {}
service Test {
rpc Ping (Empty) returns (Empty) {}
}
]], "test.proto")
local servers = {}
-- 启动多个服务器实例,监听不同端口
for port = 50051, 50054 do
local reg = registrar.new()
reg:register(p.loaded["test.proto"], {
Ping = function() return {} end
})
servers[#servers + 1] = grpc.listen {
addr = "127.0.0.1:" .. port,
registrar = reg,
}
print("Server started on port", port)
end
-- 清理
for _, server in ipairs(servers) do
server:close()
end
end)5. 批量处理
对于大量小请求,考虑批量处理:
local protoc = require "protoc"
local p = protoc:new()
p:load([[
syntax = "proto3";
package batch;
message Item {
int32 id = 1;
string name = 2;
}
// 推荐:批量请求
message BatchGetRequest {
repeated int32 ids = 1;
}
message BatchGetResponse {
repeated Item items = 1;
}
service DataService {
rpc BatchGet (BatchGetRequest) returns (BatchGetResponse) {}
}
]], "batch.proto")
-- 批量获取比单个循环调用更高效参见
- silly - 核心模块
- silly.net.http - HTTP 协议(gRPC 的传输层)
- silly.net.tcp - TCP 协议
- silly.net.tls - TLS/SSL 加密
- silly.sync.waitgroup - 协程等待组