Python CQRS
Event-Driven Architecture Framework for Distributed Systems
Breaking Changes in v5.0.0
Starting with version 5.0.0, Pydantic support will become optional. The default implementations of Request, Response, DomainEvent, and NotificationEvent will be migrated to dataclasses-based implementations.
Core FeaturesΒΆ
-
Bootstrap
Quick project setup and configuration with automatic DI container setup.
-
Request Handlers
Handle commands and queries with full type safety and async support.
-
Saga Pattern
Orchestrated Saga for distributed transactions with automatic compensation.
-
Event Handling
Process domain events with parallel processing and runtime execution.
-
Transaction Outbox
Guaranteed event delivery with at-least-once semantics.
-
Chain of Responsibility
Sequential request processing with flexible handler chaining.
-
Streaming
Incremental processing with real-time progress updates via SSE.
-
Integrations
FastAPI and FastStream integrations out of the box.
-
Mermaid Diagrams
Visualize architecture patterns and flows with interactive Mermaid diagrams.
What is it?ΒΆ
Python CQRS is a framework for implementing the CQRS (Command Query Responsibility Segregation) pattern in Python applications. It helps separate read and write operations, improving scalability, performance, and code maintainability.
Key Highlights:
- Performance β Separation of commands and queries, parallel event processing
- Reliability β Transaction Outbox for guaranteed event delivery, Saga with compensation support and eventual consistency
- Flexible Types β Easy integration with any type: Pydantic, dataclasses, msgspec, attrs, TypedDict and more
- Ready Integrations β FastAPI and FastStream out of the box
- Simple Setup β Bootstrap for quick configuration
- Proven Patterns β CQRS, Saga, Outbox and more to keep services decoupled and maintainable
Project statusΒΆ
InstallationΒΆ
Install Python CQRS using pip or uv:
Using pip:
Using uv:
Requirements
Python 3.10+
Quick StartΒΆ
import di
import cqrs
from cqrs.requests import bootstrap
# Define command, response and handler
class CreateUserCommand(cqrs.Request):
email: str
name: str
class CreateUserResponse(cqrs.Response):
user_id: str
class CreateUserHandler(cqrs.RequestHandler[CreateUserCommand, CreateUserResponse]):
async def handle(self, request: CreateUserCommand) -> CreateUserResponse:
# Your business logic here
user_id = f"user_{request.email}"
return CreateUserResponse(user_id=user_id)
# Bootstrap and use
mediator = bootstrap.bootstrap(
di_container=di.Container(),
commands_mapper=lambda m: m.bind(CreateUserCommand, CreateUserHandler),
)
result = await mediator.send(CreateUserCommand(email="user@example.com", name="John"))
See Bootstrap for detailed setup instructions.