Intermediate

REST APIs built on HTTP and JSON are everywhere, but they come with trade-offs: verbose payloads, loose contracts, and no built-in streaming. When services need to talk to each other at high throughput — hundreds of thousands of calls per second with tight latency budgets — gRPC is the industry answer. Used by Google, Netflix, and Cloudflare, gRPC combines Protocol Buffers for compact binary serialization with HTTP/2 for multiplexed connections and native bidirectional streaming.

The grpcio Python package brings the full gRPC runtime to Python. You define services in a .proto schema file, run a code generator, and get strongly-typed client and server stubs automatically. The contract lives in the schema, not in documentation that can drift out of sync. If you change the proto file, the generated code changes with it — no more “the client sends a string but the server expects an int” bugs.

This article covers the complete gRPC workflow in Python: writing a .proto service definition, generating Python stubs, building a server, writing a client, and implementing all four RPC types — unary, server-streaming, client-streaming, and bidirectional streaming.

gRPC Quick Example: Hello Service in 30 Lines

The fastest way to see gRPC in action is a minimal hello service. You need three files: the schema, the server, and the client:

# hello.proto
syntax = "proto3";
package hello;

service Greeter {
  rpc SayHello (HelloRequest) returns (HelloReply);
}

message HelloRequest { string name = 1; }
message HelloReply   { string message = 1; }
# Generate Python stubs (run once after changing the .proto)
pip install grpcio grpcio-tools
python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. hello.proto
# Creates: hello_pb2.py  hello_pb2_grpc.py
# server.py
import grpc
from concurrent import futures
import hello_pb2
import hello_pb2_grpc

class GreeterServicer(hello_pb2_grpc.GreeterServicer):
    def SayHello(self, request, context):
        return hello_pb2.HelloReply(message=f"Hello, {request.name}!")

server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
hello_pb2_grpc.add_GreeterServicer_to_server(GreeterServicer(), server)
server.add_insecure_port('[::]:50051')
server.start()
print("Server started on :50051")
server.wait_for_termination()
# client.py
import grpc
import hello_pb2
import hello_pb2_grpc

with grpc.insecure_channel('localhost:50051') as channel:
    stub = hello_pb2_grpc.GreeterStub(channel)
    response = stub.SayHello(hello_pb2.HelloRequest(name="World"))
    print(f"Server replied: {response.message}")
Output:
Server replied: Hello, World!

Three files, zero JSON parsing, zero URL routing. The schema enforces the contract at compile time rather than at runtime.

Installing grpcio

Install both the runtime and the code generation tools:

pip install grpcio grpcio-tools

# Verify
python -c "import grpc; print(grpc.__version__)"
Output:
1.62.1

grpcio is the runtime — servers, channels, and interceptors. grpcio-tools bundles the protoc compiler with the Python gRPC plugin so you can generate stubs without installing a separate C++ toolchain.

PackagePurposeRequired At
grpcioClient/server runtimeRuntime
grpcio-toolsProto compiler + code genDev/build time
protobufProtocol Buffer serializationRuntime (auto-installed)
grpcio-reflectionServer reflection for grpcurl/PostmanOptional/dev
grpcio-health-checkingStandard health check serviceOptional/prod
gRPC .proto file generating Python code
grpc_tools.protoc: your schema becomes your code.

Writing a Protocol Buffers Service Definition

The .proto file is the source of truth for your service. It defines the service methods and the request/response message types. Here is a more realistic example — a currency conversion service:

# finance.proto
syntax = "proto3";
package finance;

service CurrencyConverter {
  // Unary: one request, one response
  rpc Convert (ConvertRequest) returns (ConvertReply);

  // Server streaming: one request, stream of rate updates
  rpc WatchRates (WatchRequest) returns (stream RateUpdate);

  // Client streaming: stream of amounts, one summary response
  rpc BatchConvert (stream ConvertRequest) returns (BatchReply);

  // Bidirectional streaming: real-time conversion feed
  rpc LiveConvert (stream ConvertRequest) returns (stream ConvertReply);
}

message ConvertRequest {
  string from_currency = 1;   // e.g. "USD"
  string to_currency   = 2;   // e.g. "EUR"
  double amount        = 3;
}

message ConvertReply {
  double converted_amount = 1;
  double rate             = 2;
  string timestamp        = 3;
}

message WatchRequest {
  string base_currency = 1;
  repeated string target_currencies = 2;
}

message RateUpdate {
  string currency = 1;
  double rate     = 2;
}

message BatchReply {
  double total_converted = 1;
  int32  count           = 2;
}
# Generate stubs
python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. finance.proto
# Creates: finance_pb2.py  finance_pb2_grpc.py

Field numbers (the = 1, = 2 after field names) are used in the binary encoding — they must be unique within a message and should never be reused once deployed. Adding new fields with new numbers is backward-compatible; removing or renumbering fields breaks existing clients.

The Four gRPC RPC Types

gRPC defines four communication patterns. Here is a complete server implementing all four for the finance service:

# finance_server.py
import grpc
import time
import random
from concurrent import futures
import finance_pb2
import finance_pb2_grpc

# Simulated exchange rates
RATES = {
    ('USD', 'EUR'): 0.92, ('USD', 'GBP'): 0.79, ('USD', 'JPY'): 149.5,
    ('EUR', 'USD'): 1.09, ('GBP', 'USD'): 1.27, ('JPY', 'USD'): 0.0067,
}

def get_rate(frm, to):
    if frm == to:
        return 1.0
    return RATES.get((frm, to), 1.0) * (1 + random.uniform(-0.002, 0.002))

class CurrencyConverterServicer(finance_pb2_grpc.CurrencyConverterServicer):

    # --- Unary RPC ---
    def Convert(self, request, context):
        rate = get_rate(request.from_currency, request.to_currency)
        return finance_pb2.ConvertReply(
            converted_amount=request.amount * rate,
            rate=rate,
            timestamp=str(time.time())
        )

    # --- Server Streaming RPC ---
    def WatchRates(self, request, context):
        """Stream rate updates every second until client cancels."""
        while not context.is_active() is False:
            for target in request.target_currencies:
                rate = get_rate(request.base_currency, target)
                yield finance_pb2.RateUpdate(currency=target, rate=rate)
            time.sleep(1.0)
            if not context.is_active():
                break

    # --- Client Streaming RPC ---
    def BatchConvert(self, request_iterator, context):
        total = 0.0
        count = 0
        for req in request_iterator:
            rate = get_rate(req.from_currency, req.to_currency)
            total += req.amount * rate
            count += 1
        return finance_pb2.BatchReply(total_converted=total, count=count)

    # --- Bidirectional Streaming RPC ---
    def LiveConvert(self, request_iterator, context):
        for req in request_iterator:
            rate = get_rate(req.from_currency, req.to_currency)
            yield finance_pb2.ConvertReply(
                converted_amount=req.amount * rate,
                rate=rate,
                timestamp=str(time.time())
            )

def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    finance_pb2_grpc.add_CurrencyConverterServicer_to_server(
        CurrencyConverterServicer(), server
    )
    server.add_insecure_port('[::]:50051')
    server.start()
    print("Finance gRPC server running on :50051")
    server.wait_for_termination()

if __name__ == '__main__':
    serve()
gRPC four RPC types switchboard
gRPC four RPC types — pick the pattern that fits your data flow.

Writing the gRPC Client

The generated stub class handles all the serialization. Each of the four RPC types gets a corresponding client pattern:

# finance_client.py
import grpc
import finance_pb2
import finance_pb2_grpc

def run():
    with grpc.insecure_channel('localhost:50051') as channel:
        stub = finance_pb2_grpc.CurrencyConverterStub(channel)

        # 1. Unary call
        reply = stub.Convert(finance_pb2.ConvertRequest(
            from_currency='USD', to_currency='EUR', amount=100.0
        ))
        print(f"Unary: $100 USD = €{reply.converted_amount:.2f} EUR (rate: {reply.rate:.4f})")

        # 2. Server streaming: receive 5 rate updates then cancel
        watch_req = finance_pb2.WatchRequest(
            base_currency='USD',
            target_currencies=['EUR', 'GBP', 'JPY']
        )
        update_count = 0
        for update in stub.WatchRates(watch_req):
            print(f"Rate update: USD/{update.currency} = {update.rate:.4f}")
            update_count += 1
            if update_count >= 6:
                break

        # 3. Client streaming: send a batch of conversions
        def batch_requests():
            pairs = [('USD','EUR',100), ('USD','GBP',250), ('EUR','USD',80)]
            for frm, to, amt in pairs:
                yield finance_pb2.ConvertRequest(
                    from_currency=frm, to_currency=to, amount=amt
                )
        batch_reply = stub.BatchConvert(batch_requests())
        print(f"Batch: {batch_reply.count} conversions, total = {batch_reply.total_converted:.2f}")

        # 4. Bidirectional streaming
        def live_requests():
            for amount in [50, 100, 200]:
                yield finance_pb2.ConvertRequest(
                    from_currency='USD', to_currency='JPY', amount=amount
                )
        for reply in stub.LiveConvert(live_requests()):
            print(f"Live: ${reply.converted_amount:.2f} JPY at rate {reply.rate:.2f}")

if __name__ == '__main__':
    run()
Output:
Unary: $100 USD = €92.14 EUR (rate: 0.9214)
Rate update: USD/EUR = 0.9198
Rate update: USD/GBP = 0.7893
Rate update: USD/JPY = 149.82
Rate update: USD/EUR = 0.9207
Rate update: USD/GBP = 0.7901
Rate update: USD/JPY = 149.71
Batch: 3 conversions, total = 412.37
Live: $7479.50 JPY at rate 149.59
Live: $14955.00 JPY at rate 149.55
Live: $29942.00 JPY at rate 149.71

Error Handling and Status Codes

gRPC has a rich set of status codes that map to HTTP/2 status semantics. Use context.abort() on the server to return a typed error, and catch grpc.RpcError on the client:

# Error handling patterns
import grpc
import finance_pb2
import finance_pb2_grpc

# Server side: abort with a status code
def Convert(self, request, context):
    supported = {'USD', 'EUR', 'GBP', 'JPY'}
    if request.from_currency not in supported:
        context.abort(
            grpc.StatusCode.INVALID_ARGUMENT,
            f"Unsupported currency: {request.from_currency}"
        )
    if request.amount <= 0:
        context.abort(
            grpc.StatusCode.OUT_OF_RANGE,
            f"Amount must be positive, got: {request.amount}"
        )
    rate = get_rate(request.from_currency, request.to_currency)
    return finance_pb2.ConvertReply(converted_amount=request.amount * rate, rate=rate)

# Client side: catch RpcError
with grpc.insecure_channel('localhost:50051') as channel:
    stub = finance_pb2_grpc.CurrencyConverterStub(channel)
    try:
        reply = stub.Convert(finance_pb2.ConvertRequest(
            from_currency='BTC', to_currency='EUR', amount=1.0
        ))
    except grpc.RpcError as e:
        print(f"gRPC error: {e.code()} — {e.details()}")
        # Output: gRPC error: StatusCode.INVALID_ARGUMENT — Unsupported currency: BTC
gRPC status codes checklist
grpc.StatusCode: typed errors that cross language boundaries without ambiguity.

Frequently Asked Questions

When should I use gRPC instead of REST?

Use gRPC when you control both client and server (internal microservices), when you need high-throughput low-latency calls, when you want a strict schema that fails fast on contract violations, or when you need streaming. REST is better for public APIs consumed by browsers and third parties — the JSON format is universally readable and HTTP/1.1 works everywhere. Many systems use REST at the edge and gRPC internally.

How do I evolve a proto schema without breaking clients?

Add new fields with new field numbers — older clients simply ignore them. Never remove or renumber existing fields once deployed. Never change the type of an existing field. Rename fields freely (the wire format uses field numbers, not names). If you need to remove a field, mark it as reserved so future developers cannot accidentally reuse the number. Use semantic versioning in the package name (e.g., package finance.v2) for breaking changes.

Can I use gRPC with Python async/await?

Yes — grpcio ships grpc.aio, an asyncio-native implementation. Replace grpc.server() with grpc.aio.server(), replace grpc.insecure_channel() with grpc.aio.insecure_channel(), and add async/await to your servicer methods. The grpc.aio module is production-ready as of grpcio 1.32 and is the recommended approach for new services using asyncio frameworks like FastAPI or Starlette.

How do I add TLS to a gRPC server?

Replace add_insecure_port with add_secure_port and pass a grpc.ssl_server_credentials() object constructed from your certificate and key files. On the client side, replace grpc.insecure_channel() with grpc.secure_channel('host:443', grpc.ssl_channel_credentials()). For mutual TLS (mTLS), pass both root certificates and client cert/key to grpc.ssl_channel_credentials(). In Kubernetes environments, service meshes like Istio handle mTLS transparently without application code changes.

How do I test a gRPC service without writing a client?

Install grpcio-reflection and enable server reflection, then use grpcurl (a command-line tool like curl for gRPC) or Postman (which supports gRPC natively). Alternatively, use grpc_tools.protoc to generate a Python test client, or write unit tests using the grpc.testing utilities that let you call servicer methods directly without a running server.

Conclusion

gRPC turns service-to-service communication into a typed function call. The .proto schema defines the contract, grpc_tools.protoc generates the boilerplate, and grpcio handles the HTTP/2 transport, multiplexing, and serialization. In this article we covered the four RPC types — unary, server streaming, client streaming, and bidirectional streaming — along with typed error handling using grpc.StatusCode. The key rules are: never change field numbers in deployed schemas, always use context.abort() for typed server errors, and catch grpc.RpcError on the client side.

For production use, add TLS via grpc.ssl_server_credentials(), enable server reflection for debugging, add a health check service via grpcio-health-checking, and consider the grpc.aio asyncio API if your service is built around async Python frameworks. When you need to expose a gRPC backend to browsers or mobile clients, grpc-gateway or Envoy proxy can translate REST/JSON calls to gRPC automatically at the edge.