Modelo de Transação¶
Esta é uma das decisões arquiteturais mais fáceis de violar sem perceber. Leia até o fim antes de implementar qualquer operação de escrita.
A regra central¶
Apenas Use Cases abrem e fecham transações. Repositories, Services e Query Handlers nunca criam nem gerenciam UoW — eles recebem tx: Transaction como parâmetro e apenas o utilizam.
# ✅ Correto — UC é o dono
class CreateAgentUC:
async def execute(self, command: CreateAgentCommand) -> CreateAgentResult:
async with self.uow.begin() as tx: # abre aqui
agent = Agent(tenant_id=command.tenant_id, ...)
await self.repo.save(tx, agent)
await self.audit.record(tx, ...)
# fecha aqui — side effects DEPOIS
await self.notifications.send(...)
# ❌ Errado — Repository abrindo transação própria
class AgentRepository:
async def save(self, agent: Agent) -> Agent:
async with self.session_maker() as session: # NUNCA faça isso
...
Se você estiver escrevendo async with ou session_maker() dentro de um repository ou service, está violando esta regra.
As três zonas¶
Toda operação de escrita se divide em três zonas com regras distintas:
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
ZONA 1 — ANTES da transação
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
IO externo lento e preparatório
└─ Buscar credenciais no vault
└─ Fazer parse de arquivo enviado pelo usuário
└─ Chamar API externa para validar dados
└─ Qualquer operação que pode demorar segundos
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
ZONA 2 — DENTRO da transação ← UoW.begin() aqui
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
Tudo que toca o banco de dados
└─ repo.get_by_id(tx, ...) — leitura de domínio
└─ repo.save(tx, entity) — escrita principal
└─ audit.record(tx, event) — log de auditoria
└─ usage_counter.increment(tx) — contadores de uso
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
ZONA 3 — APÓS o commit
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
Side effects que dependem do sucesso da escrita
└─ Enviar email ou notificação push
└─ Disparar webhook para sistema externo
└─ Publicar evento em fila (Celery task)
└─ Invalidar cache
Por que essa divisão importa¶
IO externo dentro da transação segura um lock de banco enquanto espera uma resposta HTTP. Se a API externa demorar 3 segundos, você está bloqueando conexões do pool por 3 segundos. Em carga, isso esgota o pool inteiro.
Side effects antes do commit causam notificações sobre operações que ainda podem falhar. Se você envia email de confirmação e o commit falha logo depois, o usuário recebeu uma confirmação de algo que não aconteceu. Não tem como desfazer.
Exemplo completo — as três zonas no código¶
class ImportDocumentUC:
async def execute(self, command: ImportDocumentCommand) -> ImportDocumentResult:
# ── ZONA 1: IO externo preparatório ──────────────────────────────
# Parse pode demorar — fora da transação
parsed_content = await self.parser.extract_text(command.file_url)
# Busca credenciais antes de abrir o lock
credentials = await self.vault.get(command.connection_id)
# ── ZONA 2: Transação ─────────────────────────────────────────────
async with self.uow.begin() as tx:
# Leitura de domínio dentro da tx
kb = await self.kb_repo.require_by_id(tx, command.kb_id, command.tenant_id)
# Valida regra de negócio
if kb.status != KnowledgeBaseStatus.ACTIVE:
raise KnowledgeBaseNotActiveError(kb_id=command.kb_id)
# Cria entidade imutável
document = Document(
tenant_id=command.tenant_id,
knowledge_base_id=kb.id,
content=parsed_content,
status=DocumentStatus.PENDING,
)
# Persiste
await self.doc_repo.save(tx, document)
# Audit dentro da mesma tx — ou falha junto ou salva junto
await self.audit.record(tx, AuditEvent(
actor=command.actor,
action="document.imported",
resource_id=str(document.id),
))
# ── ZONA 3: Side effects após commit ─────────────────────────────
# Só chega aqui se o commit foi bem sucedido
await self.task_queue.enqueue(
"index_document",
document_id=str(document.id),
)
return ImportDocumentResult(document_id=document.id)
Erros comuns e como identificá-los¶
❌ Abrir transação dentro de um Service¶
# Errado — Service não gerencia UoW
class KnowledgeBaseService:
async def add_document(self, kb_id: KnowledgeBaseID, content: str):
async with self.uow.begin() as tx: # ← viola a regra
...
Por que é problema: o UC que chama esse service vai ter sua própria transação. Você acaba com transações aninhadas — ou vai dar erro, ou vai criar comportamento imprevisível dependendo do driver.
Correção: o Service recebe tx do UC:
class KnowledgeBaseService:
async def add_document(self, tx: Transaction, kb_id: KnowledgeBaseID, content: str):
await self.doc_repo.save(tx, Document(...))
❌ Side effect dentro da transação¶
async with self.uow.begin() as tx:
await self.repo.save(tx, agent)
await self.email.send_welcome(agent.owner_email) # ← dentro da tx
Por que é problema: se send_welcome demorar 2 segundos (timeout de SMTP), você segurou o lock por 2 segundos. Se o commit falhar depois do email, o usuário recebeu um email de uma operação que não existiu.
Correção: mova o email para a Zona 3:
async with self.uow.begin() as tx:
await self.repo.save(tx, agent)
await self.email.send_welcome(agent.owner_email) # ← após commit
❌ IO externo dentro da transação¶
async with self.uow.begin() as tx:
credentials = await self.vault.get(connection_id) # ← chamada HTTP dentro da tx
await self.repo.save(tx, entity)
Correção: busque credenciais na Zona 1, antes de abrir a transação.
❌ Commit parcial — audit fora da transação¶
async with self.uow.begin() as tx:
await self.repo.save(tx, entity)
# Commit aconteceu — se audit falhar aqui, operação ficou sem log
await self.audit.record(tx_novo, event) # ← tx diferente
Correção: audit e a escrita principal devem estar na mesma transação. Se o audit falhar, a operação inteira falha — o que é o comportamento correto para rastreabilidade.
Transações em Workers (Celery)¶
Workers seguem o mesmo modelo, com uma particularidade: o payload da task deve conter apenas IDs e primitivos — nunca objetos de domínio serializados. O UC é instanciado dentro da task e abre sua própria transação:
@celery_app.task
def index_document_task(document_id: str) -> None:
run_async(
container.index_document_uc.execute(
IndexDocumentCommand(document_id=DocumentID(document_id))
)
)
A task não conhece UoW. Ela delega para um UC que segue o mesmo three-zone model.
Tratamento de erro: se uma task falha após o commit (durante side effects da Zona 3), não re-abra a mesma transação. Use uma nova sessão para registrar o erro de forma independente.
Referência rápida para code review¶
Ao revisar um PR, procure por estes sinais de alerta:
| O que ver no código | O que significa |
|---|---|
async with self.uow dentro de Repository ou Service |
Violação — UoW só em UC |
session_maker() fora de infraestrutura compartilhada |
Transação aberta no lugar errado |
Email/webhook/Celery task dentro do async with uow |
Side effect na Zona 2 — mover para Zona 3 |
Chamada HTTP/vault dentro do async with uow |
IO externo na transação — mover para Zona 1 |
audit.record em tx diferente da escrita principal |
Audit desacoplado — risco de operação sem log |