gRPC Service Implementation
Purpose
Practical walkthrough of building a gRPC service in Python: proto schema design, code generation, server implementation, client usage, streaming, and error handling. Synthesized from: gRPC and Protobuf.
Examples
Step 1 — Define the proto schema (inference.proto):
syntax = "proto3";
package inference;
// Unary RPC
service InferenceService {
rpc Predict (PredictRequest) returns (PredictResponse);
rpc BatchPredict (BatchPredictRequest) returns (BatchPredictResponse);
// Server-streaming: return results as they are computed
rpc StreamPredict (StreamRequest) returns (stream PredictResponse);
}
message PredictRequest {
string model_name = 1;
repeated float features = 2; // repeated = array
map<string, string> metadata = 3;
}
message PredictResponse {
string label = 1;
float score = 2;
int64 latency_ms = 3;
}
message BatchPredictRequest {
repeated PredictRequest requests = 1;
}
message BatchPredictResponse {
repeated PredictResponse responses = 1;
}
message StreamRequest {
string model_name = 1;
int32 n_samples = 2;
}Step 2 — Generate Python code:
pip install grpcio grpcio-tools
python -m grpc_tools.protoc \
-I. \
--python_out=. \
--pyi_out=. \
--grpc_python_out=. \
inference.proto
# generates: inference_pb2.py, inference_pb2.pyi, inference_pb2_grpc.pyStep 3 — Implement the server:
import grpc
from concurrent import futures
import time
import inference_pb2
import inference_pb2_grpc
class InferenceServicer(inference_pb2_grpc.InferenceServiceServicer):
def __init__(self, model):
self.model = model
def Predict(self, request, context):
try:
start = time.perf_counter()
result = self.model.predict([request.features])[0]
latency = int((time.perf_counter() - start) * 1000)
return inference_pb2.PredictResponse(
label=str(result["label"]),
score=float(result["score"]),
latency_ms=latency,
)
except ValueError as e:
context.abort(grpc.StatusCode.INVALID_ARGUMENT, str(e))
def StreamPredict(self, request, context):
"""Server-streaming: yield one response per sample."""
for i in range(request.n_samples):
if context.is_active():
result = self.model.predict_one(i)
yield inference_pb2.PredictResponse(
label=result["label"], score=result["score"]
)
def serve(model, port: int = 50051):
server = grpc.server(
futures.ThreadPoolExecutor(max_workers=10),
options=[
("grpc.max_send_message_length", 50 * 1024 * 1024),
("grpc.max_receive_message_length", 50 * 1024 * 1024),
],
)
inference_pb2_grpc.add_InferenceServiceServicer_to_server(
InferenceServicer(model), server
)
server.add_insecure_port(f"[::]:{port}")
server.start()
print(f"gRPC server listening on :{port}")
server.wait_for_termination()Step 4 — Implement the client:
import grpc
import inference_pb2
import inference_pb2_grpc
def get_channel(host: str = "localhost", port: int = 50051) -> grpc.Channel:
return grpc.insecure_channel(
f"{host}:{port}",
options=[("grpc.keepalive_time_ms", 10000)],
)
def predict(channel, model_name: str, features: list[float]) -> dict:
stub = inference_pb2_grpc.InferenceServiceStub(channel)
request = inference_pb2.PredictRequest(
model_name=model_name,
features=features,
)
try:
response = stub.Predict(request, timeout=5.0)
return {"label": response.label, "score": response.score}
except grpc.RpcError as e:
raise RuntimeError(f"gRPC error {e.code()}: {e.details()}")
# Context manager for resource cleanup
with get_channel() as channel:
result = predict(channel, "classifier", [1.0, 2.0, 3.0])TLS / mTLS (production):
# Server
server_credentials = grpc.ssl_server_credentials(
[(open("server.key","rb").read(), open("server.crt","rb").read())]
)
server.add_secure_port("[::]:50051", server_credentials)
# Client
channel_credentials = grpc.ssl_channel_credentials(
root_certificates=open("ca.crt","rb").read()
)
channel = grpc.secure_channel("myservice:50051", channel_credentials)Testing gRPC services:
import pytest
from grpc import experimental
@pytest.fixture
def grpc_channel(model):
with experimental.channel_ready_future(
grpc.insecure_channel("localhost:50052")
):
yield
# Or use grpcio-testing:
# from grpc_testing import server_from_dictionary, strict_real_time
def test_predict_unary(model):
servicer = InferenceServicer(model)
context = MockContext()
req = inference_pb2.PredictRequest(model_name="cls", features=[1.0, 2.0])
resp = servicer.Predict(req, context)
assert resp.score > 0.0Architecture
Client (Python/Go/Rust)
│
│ HTTP/2 + binary Protobuf frames
▼
gRPC Server (Python ThreadPoolExecutor)
├── InferenceServicer.Predict() ← unary
├── InferenceServicer.BatchPredict() ← unary, batched
└── InferenceServicer.StreamPredict() ← server-streaming
When to prefer gRPC over REST:
- Internal service-to-service communication where latency and throughput matter
- Strongly-typed contracts shared across multiple language clients (generate stubs for Go, Python, Rust from one
.proto) - Streaming: server-push, bidirectional streaming
- Large message volumes: Protobuf is ~3–10× smaller than equivalent JSON
When to prefer REST:
- Public-facing APIs consumed by browsers (gRPC-Web adds complexity)
- Teams that prioritise human-readable payloads for debugging
- Simple request/response with no streaming requirements