Pular para conteúdo

Modelo de Transação

Esta é uma das decisões arquiteturais mais fáceis de violar sem perceber. Leia até o fim antes de implementar qualquer operação de escrita.


A regra central

Apenas Use Cases abrem e fecham transações. Repositories, Services e Query Handlers nunca criam nem gerenciam UoW — eles recebem tx: Transaction como parâmetro e apenas o utilizam.

# ✅ Correto — UC é o dono
class CreateAgentUC:
    async def execute(self, command: CreateAgentCommand) -> CreateAgentResult:
        async with self.uow.begin() as tx:  # abre aqui
            agent = Agent(tenant_id=command.tenant_id, ...)
            await self.repo.save(tx, agent)
            await self.audit.record(tx, ...)
        # fecha aqui — side effects DEPOIS
        await self.notifications.send(...)

# ❌ Errado — Repository abrindo transação própria
class AgentRepository:
    async def save(self, agent: Agent) -> Agent:
        async with self.session_maker() as session:  # NUNCA faça isso
            ...

Se você estiver escrevendo async with ou session_maker() dentro de um repository ou service, está violando esta regra.


As três zonas

Toda operação de escrita se divide em três zonas com regras distintas:

━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
ZONA 1 — ANTES da transação
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
  IO externo lento e preparatório
  └─ Buscar credenciais no vault
  └─ Fazer parse de arquivo enviado pelo usuário
  └─ Chamar API externa para validar dados
  └─ Qualquer operação que pode demorar segundos

━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
ZONA 2 — DENTRO da transação   ← UoW.begin() aqui
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
  Tudo que toca o banco de dados
  └─ repo.get_by_id(tx, ...)      — leitura de domínio
  └─ repo.save(tx, entity)        — escrita principal
  └─ audit.record(tx, event)      — log de auditoria
  └─ usage_counter.increment(tx)  — contadores de uso

━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
ZONA 3 — APÓS o commit
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
  Side effects que dependem do sucesso da escrita
  └─ Enviar email ou notificação push
  └─ Disparar webhook para sistema externo
  └─ Publicar evento em fila (Celery task)
  └─ Invalidar cache

Por que essa divisão importa

IO externo dentro da transação segura um lock de banco enquanto espera uma resposta HTTP. Se a API externa demorar 3 segundos, você está bloqueando conexões do pool por 3 segundos. Em carga, isso esgota o pool inteiro.

Side effects antes do commit causam notificações sobre operações que ainda podem falhar. Se você envia email de confirmação e o commit falha logo depois, o usuário recebeu uma confirmação de algo que não aconteceu. Não tem como desfazer.


Exemplo completo — as três zonas no código

class ImportDocumentUC:
    async def execute(self, command: ImportDocumentCommand) -> ImportDocumentResult:

        # ── ZONA 1: IO externo preparatório ──────────────────────────────
        # Parse pode demorar — fora da transação
        parsed_content = await self.parser.extract_text(command.file_url)

        # Busca credenciais antes de abrir o lock
        credentials = await self.vault.get(command.connection_id)

        # ── ZONA 2: Transação ─────────────────────────────────────────────
        async with self.uow.begin() as tx:

            # Leitura de domínio dentro da tx
            kb = await self.kb_repo.require_by_id(tx, command.kb_id, command.tenant_id)

            # Valida regra de negócio
            if kb.status != KnowledgeBaseStatus.ACTIVE:
                raise KnowledgeBaseNotActiveError(kb_id=command.kb_id)

            # Cria entidade imutável
            document = Document(
                tenant_id=command.tenant_id,
                knowledge_base_id=kb.id,
                content=parsed_content,
                status=DocumentStatus.PENDING,
            )

            # Persiste
            await self.doc_repo.save(tx, document)

            # Audit dentro da mesma tx — ou falha junto ou salva junto
            await self.audit.record(tx, AuditEvent(
                actor=command.actor,
                action="document.imported",
                resource_id=str(document.id),
            ))

        # ── ZONA 3: Side effects após commit ─────────────────────────────
        # Só chega aqui se o commit foi bem sucedido
        await self.task_queue.enqueue(
            "index_document",
            document_id=str(document.id),
        )

        return ImportDocumentResult(document_id=document.id)

Erros comuns e como identificá-los

❌ Abrir transação dentro de um Service

# Errado — Service não gerencia UoW
class KnowledgeBaseService:
    async def add_document(self, kb_id: KnowledgeBaseID, content: str):
        async with self.uow.begin() as tx:  # ← viola a regra
            ...

Por que é problema: o UC que chama esse service vai ter sua própria transação. Você acaba com transações aninhadas — ou vai dar erro, ou vai criar comportamento imprevisível dependendo do driver.

Correção: o Service recebe tx do UC:

class KnowledgeBaseService:
    async def add_document(self, tx: Transaction, kb_id: KnowledgeBaseID, content: str):
        await self.doc_repo.save(tx, Document(...))

❌ Side effect dentro da transação

async with self.uow.begin() as tx:
    await self.repo.save(tx, agent)
    await self.email.send_welcome(agent.owner_email)  # ← dentro da tx

Por que é problema: se send_welcome demorar 2 segundos (timeout de SMTP), você segurou o lock por 2 segundos. Se o commit falhar depois do email, o usuário recebeu um email de uma operação que não existiu.

Correção: mova o email para a Zona 3:

async with self.uow.begin() as tx:
    await self.repo.save(tx, agent)

await self.email.send_welcome(agent.owner_email)  # ← após commit

❌ IO externo dentro da transação

async with self.uow.begin() as tx:
    credentials = await self.vault.get(connection_id)  # ← chamada HTTP dentro da tx
    await self.repo.save(tx, entity)

Correção: busque credenciais na Zona 1, antes de abrir a transação.


❌ Commit parcial — audit fora da transação

async with self.uow.begin() as tx:
    await self.repo.save(tx, entity)

# Commit aconteceu — se audit falhar aqui, operação ficou sem log
await self.audit.record(tx_novo, event)  # ← tx diferente

Correção: audit e a escrita principal devem estar na mesma transação. Se o audit falhar, a operação inteira falha — o que é o comportamento correto para rastreabilidade.


Transações em Workers (Celery)

Workers seguem o mesmo modelo, com uma particularidade: o payload da task deve conter apenas IDs e primitivos — nunca objetos de domínio serializados. O UC é instanciado dentro da task e abre sua própria transação:

@celery_app.task
def index_document_task(document_id: str) -> None:
    run_async(
        container.index_document_uc.execute(
            IndexDocumentCommand(document_id=DocumentID(document_id))
        )
    )

A task não conhece UoW. Ela delega para um UC que segue o mesmo three-zone model.

Tratamento de erro: se uma task falha após o commit (durante side effects da Zona 3), não re-abra a mesma transação. Use uma nova sessão para registrar o erro de forma independente.


Referência rápida para code review

Ao revisar um PR, procure por estes sinais de alerta:

O que ver no código O que significa
async with self.uow dentro de Repository ou Service Violação — UoW só em UC
session_maker() fora de infraestrutura compartilhada Transação aberta no lugar errado
Email/webhook/Celery task dentro do async with uow Side effect na Zona 2 — mover para Zona 3
Chamada HTTP/vault dentro do async with uow IO externo na transação — mover para Zona 1
audit.record em tx diferente da escrita principal Audit desacoplado — risco de operação sem log