On the surface, the requirement seemed straightforward: process a user-uploaded document through a series of NLP operations—text extraction, sentiment analysis, named entity recognition—and finally, write the structured results to an analytics database and update an index. This entire workflow needed to be atomic; either it all succeeds, or it’s as if nothing ever happened. In a monolithic application with a single database, a TransactionScope would have wrapped this up nicely. But in our reality, text extraction, sentiment analysis, and entity recognition are three independent services, potentially deployed in different locations, each with its own database or state store. Traditional distributed transactions (2PC) are all but deprecated in modern microservices architectures due to their synchronous blocking nature and reliance on specific infrastructure.
The problem thus transforms into: how can we provide ACID-like atomicity for a business process spanning multiple services without using 2PC? This question leads us squarely to the Saga pattern. However, pulling in a full-fledged message queue (like RabbitMQ) and a heavyweight Saga framework (like MassTransit or NServiceBus) felt like overkill for our highly cohesive module, dramatically increasing operational overhead. What we needed was a solution that was lightweight, self-contained, and easy to understand and maintain.
The final decision was to build an orchestration-based Saga coordinator from scratch within our ASP.NET Core application using C#. It would persist the business process’s state in the application’s primary database (PostgreSQL), thereby achieving long-running process reliability and compensation capabilities without introducing significant external dependencies.
Defining the Core Saga Contract
The design begins with a clear contract. A Saga consists of multiple steps, each comprising a forward action and a compensating action.
// ISagaStep.cs
/// <summary>
/// Defines a single, independent step within a Saga workflow.
/// Each step must be idempotent and provide a compensation action.
/// </summary>
/// <typeparam name="TContext">The type of the context object for the Saga execution.</typeparam>
public interface ISagaStep<TContext> where TContext : class
{
/// <summary>
/// The name of the step, used for logging and state tracking.
/// </summary>
string Name { get; }
/// <summary>
/// The forward action: executes the business logic for this step.
/// </summary>
/// <param name="context">The shared context for the Saga, used to pass data between steps.</param>
/// <returns>A Task representing the asynchronous operation.</returns>
Task ExecuteAsync(TContext context);
/// <summary>
/// The compensating action: reverts the changes made by this step if a subsequent step fails.
/// </summary>
/// <param name="context">The shared context for the Saga.</param>
/// <returns>A Task representing the asynchronous compensation operation.</returns>
Task CompensateAsync(TContext context);
}
This interface is the cornerstone of the entire pattern. TContext is key; it acts as a state carrier, passed through the Saga’s entire lifecycle, connecting all the steps.
Next is the persistence model for the Saga itself. We need an entity to record the execution progress of each Saga instance. This is critical for recovery after a system crash.
// SagaState.cs
using System.ComponentModel.DataAnnotations;
using System.ComponentModel.DataAnnotations.Schema;
public class SagaState
{
[Key]
public Guid Id { get; set; }
[Required]
public string SagaType { get; set; }
[Required]
public int CurrentStep { get; set; }
[Required]
public SagaStatus Status { get; set; }
// Using a JSONB/JSON column for high flexibility.
[Required]
[Column(TypeName = "jsonb")]
public string ContextData { get; set; }
public DateTime CreatedAt { get; set; } = DateTime.UtcNow;
public DateTime UpdatedAt { get; set; } = DateTime.UtcNow;
}
public enum SagaStatus
{
Pending,
Executing,
Compensating,
Succeeded,
Failed
}
Using Entity Framework Core, this SagaState class is mapped to a database table. The ContextData field uses jsonb (in PostgreSQL) or nvarchar(max) (in SQL Server) to serialize our TContext object. This provides immense flexibility, avoiding the need to design a new state table for every type of Saga workflow.
Building the Saga Orchestrator
The orchestrator is the heart of the Saga pattern. It’s responsible for executing each step in sequence, recording progress, and triggering the compensation flow upon failure.
// SagaOrchestrator.cs
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;
using System.Text.Json;
public class SagaOrchestrator<TContext> where TContext : class
{
private readonly ILogger<SagaOrchestrator<TContext>> _logger;
private readonly YourDbContext _dbContext;
private readonly IReadOnlyList<ISagaStep<TContext>> _steps;
private readonly string _sagaType;
public SagaOrchestrator(
ILogger<SagaOrchestrator<TContext>> logger,
YourDbContext dbContext,
IEnumerable<ISagaStep<TContext>> steps)
{
_logger = logger;
_dbContext = dbContext;
_steps = steps.ToList();
_sagaType = typeof(TContext).Name; // Use context type as the Saga type identifier
}
public async Task<Guid> ExecuteAsync(TContext context)
{
var state = new SagaState
{
Id = Guid.NewGuid(),
SagaType = _sagaType,
CurrentStep = 0,
Status = SagaStatus.Executing,
ContextData = JsonSerializer.Serialize(context)
};
// Crucial: The creation of the Saga state is itself a transaction.
_dbContext.SagaStates.Add(state);
await _dbContext.SaveChangesAsync();
_logger.LogInformation("Saga {SagaId} of type {SagaType} started.", state.Id, _sagaType);
try
{
for (int i = 0; i < _steps.Count; i++)
{
state.CurrentStep = i;
// Persist the current progress *before* executing the business logic.
// This is the core of recoverability. If the server crashes during ExecuteAsync,
// on restart we know whether to retry step `i` or to compensate.
await UpdateStateAsync(state);
var currentStep = _steps[i];
_logger.LogInformation("Saga {SagaId}, executing step: {StepName}", state.Id, currentStep.Name);
await currentStep.ExecuteAsync(context);
// After successful execution, update the context for the next step.
state.ContextData = JsonSerializer.Serialize(context);
}
state.Status = SagaStatus.Succeeded;
_logger.LogInformation("Saga {SagaId} succeeded.", state.Id);
}
catch (Exception ex)
{
_logger.LogError(ex, "Saga {SagaId} failed at step {StepIndex}. Starting compensation.", state.Id, state.CurrentStep);
state.Status = SagaStatus.Failed; // First, mark as failed
await UpdateStateAsync(state); // Persist the failed state
await CompensateAsync(state, context);
// Re-throw the original exception so the caller knows the operation failed.
throw;
}
finally
{
// Final state update
await UpdateStateAsync(state);
}
return state.Id;
}
private async Task CompensateAsync(SagaState state, TContext context)
{
state.Status = SagaStatus.Compensating;
await UpdateStateAsync(state);
_logger.LogWarning("Saga {SagaId} entering compensation mode.", state.Id);
// From the failed step, execute compensation actions in reverse order.
for (int i = state.CurrentStep; i >= 0; i--)
{
// In a real project, you might need to deserialize historical versions from state.ContextData.
var stepToCompensate = _steps[i];
try
{
_logger.LogWarning("Saga {SagaId}, compensating step: {StepName}", state.Id, stepToCompensate.Name);
await stepToCompensate.CompensateAsync(context);
// Update progress to show this step has been successfully compensated.
state.CurrentStep = i - 1;
await UpdateStateAsync(state);
}
catch (Exception compEx)
{
// A compensation failure is a critical problem requiring manual intervention.
_logger.LogCritical(compEx, "Saga {SagaId} CRITICAL FAILURE: Compensation for step {StepName} failed.", state.Id, stepToCompensate.Name);
// The Saga is now in a "dirty" state and cannot continue automatic compensation.
// The status will remain Compensating, and CurrentStep will point to the failed compensation step.
// This should trigger an alert for ops to handle.
return;
}
}
_logger.LogWarning("Saga {SagaId} compensation completed.", state.Id);
}
private async Task UpdateStateAsync(SagaState state)
{
state.UpdatedAt = DateTime.UtcNow;
_dbContext.SagaStates.Update(state);
// Use a separate transaction to save the Saga state, ensuring it's decoupled
// from the business logic's transactions.
await _dbContext.SaveChangesAsync();
}
}
This orchestrator implementation has several key design considerations:
- Persistence First: Before executing any step’s
ExecuteAsync, the orchestrator first updates theSagaStatein the database, settingCurrentStepto the current step’s index. If the system crashes during step execution, a recovery process simply needs to read theSagaStatetable to know which Saga instances were interrupted and at which step, allowing it to decide whether to retry or compensate. - Compensation Flow: When any
ExecuteAsyncthrows an exception,CompensateAsyncis triggered. It starts from the failed step and invokes theCompensateAsyncmethod of all previously completed steps in reverse order. - Handling Compensation Failures: The compensation action itself can fail. This is a critical problem, logged with
LogCriticalin the code, which halts further automatic compensation. Such “dirty” Sagas require manual intervention. In a production environment, this would trigger a high-priority alert. - Separation of State and Business Logic: The
SaveChangesAsyncfor the Saga state and the data operations within a business step should be in different transactions. The orchestrator is responsible only for the ACID properties of the workflow state, while eachISagaStepis responsible for atomicity within its own business domain.
Applying to the NLP Workflow
Now, let’s apply this generic orchestrator to our specific NLP document processing workflow.
First, we define our context, DocumentProcessingContext.
// DocumentProcessingContext.cs
public class DocumentProcessingContext
{
public Guid DocumentId { get; set; }
public string RawContent { get; set; }
public string ExtractedText { get; set; }
public float? SentimentScore { get; set; }
public List<string> Entities { get; set; } = new();
// Track resource IDs created by each step for compensation.
public Guid? TextResourceId { get; set; }
public Guid? SentimentAnalysisId { get; set; }
public Guid? EntityRecognitionId { get; set; }
}
The context object not only passes data but also records the IDs of resources created by each step, which is crucial for precise compensation.
Next, we implement ISagaStep for each part of the NLP flow.
// Steps/ExtractTextStep.cs
public class ExtractTextStep : ISagaStep<DocumentProcessingContext>
{
private readonly ITextExtractionService _textExtractionService;
public string Name => "TextExtraction";
public ExtractTextStep(ITextExtractionService textExtractionService)
{
_textExtractionService = textExtractionService;
}
public async Task ExecuteAsync(DocumentProcessingContext context)
{
var result = await _textExtractionService.ExtractAsync(context.DocumentId, context.RawContent);
context.ExtractedText = result.Text;
context.TextResourceId = result.ResourceId; // Record the created resource ID
}
public async Task CompensateAsync(DocumentProcessingContext context)
{
if (context.TextResourceId.HasValue)
{
// Compensation: delete the extracted and stored text resource.
await _textExtractionService.DeleteExtractedTextAsync(context.TextResourceId.Value);
}
}
}
// Steps/AnalyzeSentimentStep.cs
public class AnalyzeSentimentStep : ISagaStep<DocumentProcessingContext>
{
private readonly ISentimentService _sentimentService;
public string Name => "SentimentAnalysis";
public AnalyzeSentimentStep(ISentimentService sentimentService)
{
_sentimentService = sentimentService;
}
public async Task ExecuteAsync(DocumentProcessingContext context)
{
// Simulate a potentially failing operation
if (context.ExtractedText.Contains("FAIL_SENTIMENT"))
{
throw new InvalidOperationException("Sentiment analysis failed due to specific content.");
}
var result = await _sentimentService.AnalyzeAsync(context.ExtractedText);
context.SentimentScore = result.Score;
context.SentimentAnalysisId = result.AnalysisId;
}
public async Task CompensateAsync(DocumentProcessingContext context)
{
if (context.SentimentAnalysisId.HasValue)
{
await _sentimentService.DeleteAnalysisResultAsync(context.SentimentAnalysisId.Value);
}
}
}
// Steps/RecognizeEntitiesStep.cs
public class RecognizeEntitiesStep : ISagaStep<DocumentProcessingContext>
{
private readonly IEntityRecognitionService _entityService;
public string Name => "EntityRecognition";
public RecognizeEntitiesStep(IEntityRecognitionService entityService)
{
_entityService = entityService;
}
public async Task ExecuteAsync(DocumentProcessingContext context)
{
var result = await _entityService.RecognizeAsync(context.ExtractedText);
context.Entities.AddRange(result.Entities);
context.EntityRecognitionId = result.RecognitionId;
}
public async Task CompensateAsync(DocumentProcessingContext context)
{
if (context.EntityRecognitionId.HasValue)
{
await _entityService.DeleteRecognitionResultAsync(context.EntityRecognitionId.Value);
}
}
}
Each step encapsulates a single responsibility: it calls an external or internal service and knows how to undo its own actions.
Assembling and Using in ASP.NET Core
Finally, we register all the components in the dependency injection container in Program.cs or Startup.cs.
// Program.cs (Minimal API example)
// 1. Register DbContext
builder.Services.AddDbContext<YourDbContext>(options =>
options.UseNpgsql(builder.Configuration.GetConnectionString("DefaultConnection")));
// 2. Register NLP services (mock implementations)
builder.Services.AddScoped<ITextExtractionService, MockTextExtractionService>();
builder.Services.AddScoped<ISentimentService, MockSentimentService>();
builder.Services.AddScoped<IEntityRecognitionService, MockEntityRecognitionService>();
// 3. Register Saga steps
// The registration order is important as it defines the Saga's execution order.
builder.Services.AddScoped<ISagaStep<DocumentProcessingContext>, ExtractTextStep>();
builder.Services.AddScoped<ISagaStep<DocumentProcessingContext>, AnalyzeSentimentStep>();
builder.Services.AddScoped<ISagaStep<DocumentProcessingContext>, RecognizeEntitiesStep>();
// 4. Register the Saga orchestrator
builder.Services.AddScoped<SagaOrchestrator<DocumentProcessingContext>>(provider =>
{
var logger = provider.GetRequiredService<ILogger<SagaOrchestrator<DocumentProcessingContext>>>();
var dbContext = provider.GetRequiredService<YourDbContext>();
// Resolve all registered steps for this Saga type from the DI container.
var steps = provider.GetServices<ISagaStep<DocumentProcessingContext>>();
return new SagaOrchestrator<DocumentProcessingContext>(logger, dbContext, steps);
});
var app = builder.Build();
// 5. Create an API endpoint to trigger the Saga
app.MapPost("/process-document", async (string rawContent, SagaOrchestrator<DocumentProcessingContext> orchestrator) =>
{
var context = new DocumentProcessingContext
{
DocumentId = Guid.NewGuid(),
RawContent = rawContent
};
try
{
var sagaId = await orchestrator.ExecuteAsync(context);
return Results.Ok(new { SagaId = sagaId, Message = "Document processing started and completed successfully." });
}
catch (Exception ex)
{
// The orchestrator re-throws the exception, allowing the API layer to return an appropriate error response.
return Results.Problem(
detail: $"Document processing failed and was rolled back. Error: {ex.Message}",
statusCode: 500
);
}
});
app.Run();
Through dependency injection, the orchestrator automatically receives all registered ISagaStep<DocumentProcessingContext> instances upon creation, in the same order they were registered. This makes adding, removing, or reordering steps straightforward—just modify the registration code in Program.cs.
Visualizing the Flow
Mermaid can be used to clearly illustrate the flow for both success and failure scenarios.
Success Flow:
sequenceDiagram
participant Client
participant API
participant Orchestrator
participant Step1 as ExtractText
participant Step2 as AnalyzeSentiment
participant Step3 as RecognizeEntities
participant DB as SagaStateDB
Client->>+API: POST /process-document
API->>+Orchestrator: ExecuteAsync(context)
Orchestrator->>+DB: Create SagaState (Status: Executing)
DB-->>-Orchestrator: SagaId
Orchestrator->>+DB: Update State (CurrentStep=0)
DB-->>-Orchestrator: OK
Orchestrator->>+Step1: ExecuteAsync()
Step1-->>-Orchestrator: Success
Orchestrator->>+DB: Update State (CurrentStep=1)
DB-->>-Orchestrator: OK
Orchestrator->>+Step2: ExecuteAsync()
Step2-->>-Orchestrator: Success
Orchestrator->>+DB: Update State (CurrentStep=2)
DB-->>-Orchestrator: OK
Orchestrator->>+Step3: ExecuteAsync()
Step3-->>-Orchestrator: Success
Orchestrator->>+DB: Update State (Status: Succeeded)
DB-->>-Orchestrator: OK
Orchestrator-->>-API: Returns SagaId
API-->>-Client: 200 OK
Failure and Compensation Flow:
sequenceDiagram
participant Client
participant API
participant Orchestrator
participant Step1 as ExtractText
participant Step2 as AnalyzeSentiment
participant Step3 as RecognizeEntities
participant DB as SagaStateDB
Client->>+API: POST /process-document
API->>+Orchestrator: ExecuteAsync(context)
Note over Orchestrator, Step1: Step 1 (ExtractText) executes successfully.
Orchestrator->>+Step1: ExecuteAsync()
Step1-->>-Orchestrator: Success
Orchestrator->>+DB: Update State (CurrentStep=1)
DB-->>-Orchestrator: OK
Note over Orchestrator, Step2: Step 2 (AnalyzeSentiment) fails.
Orchestrator->>+Step2: ExecuteAsync()
Step2-->>-Orchestrator: Throws Exception
Orchestrator->>+DB: Update State (Status: Failed)
DB-->>-Orchestrator: OK
Orchestrator->>Orchestrator: Start Compensation
Orchestrator->>+DB: Update State (Status: Compensating)
DB-->>-Orchestrator: OK
Note right of Orchestrator: Compensation begins. Step 2 created no resources, so its compensation is a no-op. Step 1 is then rolled back.
Orchestrator->>+Step1: CompensateAsync()
Step1-->>-Orchestrator: Compensation Success
Orchestrator->>+DB: Update State (CurrentStep=-1)
DB-->>-Orchestrator: OK
Orchestrator-->>-API: Throws Exception
API-->>-Client: 500 Internal Server Error
Limitations and Future Outlook
This lightweight, orchestration-based Saga coordinator excels at providing atomicity for complex business processes within a single service, avoiding the complexity of external message brokers. But its boundaries are also clear. First, it’s an “in-process” coordinator, making it unsuitable for scenarios spanning multiple, independently deployed microservices. Second, the Saga’s recovery mechanism is passive. If the application crashes mid-Saga, it requires an external, scheduled Recovery Job to scan the SagaState table, find instances stuck in Executing or Compensating status for too long, and attempt to re-trigger their execution or compensation logic.
Future iterations could explore publishing Saga state changes as domain events, which a separate, high-availability Saga workflow engine could subscribe to and drive. This would decouple the process coordination responsibility from the main application, turning it into a more generic platform capability. Additionally, timeout and retry policies could be added to ISagaStep to handle transient failures in dependent services, further enhancing the workflow’s resilience.