Automatic database transactions with Actix-Web and Sqlx

Valentin Cocaud

About Me

My name is Valentin Cocaud, EmrysMyrddin on GitHub. I’m a full-stack developer who loves to work with a lot of weird or unused technologies. And one of hobby is to build utils and library to ease the use of databases in my project. I know, the man have weird passions.

Automatic Transactions in REST applications

Lets start with the beginning: What is this idea of automatic transactions ?

In a REST application, each route has a handler which probably will need access to the database. To access the database, the handler will need a database client, connected to the database. When running locally on the developer laptop, a single global connection opened at application startup is probably more than enough. But this strategy will quickly start to show its limit once deployed in production.

  • Parallel requests will share the same client, which means that if a handler makes a slow query, others will have to wait until this very slow one is done.
  • Parallel requests can interleave read and write queries, leading to race conditions (which are notoriously fun to debug).
  • A single handler making multiple serial writes can let the database in a broken state if an error happens in the middle of the series of mutation.

All of this problems can be solved using some techniques:

  • Use a connection pool instead of a single connection: a single request will not block all other ones because of a slow query.
  • Assign a client to each request: a single request will not starve the connection pool by throwing thousands of queries at once (yes, this can happen sometimes).
  • Use transactions to ensure database correctness.

All of this have been once standardized, and used to be automatically done by the web framework (Spring, Hibernate, JPA ? Haaaa, good old days…). But it seems that the moment you leave the Java world, you will have to do it by yourself.

But don’t be afraid, this task is in fact a pretty fun exercises when it comes to discover a new language, like in my case: Rust.

In this blog post, we will go through the journey of achieving automatic transactions for a REST (or any HTTP) application. I you are only interested in the result, you can find all the code at the end of this article.

HTTP Server setup

Now that we know what is our goal, let’s start our journey with a basic web server setup. I have chosen to stick with the more poplar tools, to keep things simple.

cargo.toml
[package]
name = "task-local-db"
version = "0.1.0"
edition = "2021"
 
[dependencies]
# HTTP framework
actix-web = "4.9.0"
# Env variable management via .env files
dotenv = "0.15.0"
# Serialization and Deserialization of data (used by sqlx and actix-web)
serde = { version = "1.0.217", features = ["derive"] }
serde_json = "1.0.137"
# SQL database client
sqlx = { version = "0.8.2", features = ["postgres", "runtime-tokio"] }
# Rust Async runtime
tokio = { version = "1.42.0", features = ["full"] }
# A logger compatible with actix-web
env_logger = "0.11.6"

In actix-web, writing a basic HTTP server straight forward. We will start with this basic and very creative “hello world” setup and build upon it.

Afterward, I will most of the time only show the changes.

src/main.rs
use actix_web::{App, HttpResponse, HttpServer, Responder};
use dotenv::dotenv;
use itertools::Itertools;
use env_logger::Env;
use actix_web::middleware::Logger;
 
#[actix_web::get("/hello_world")]
async fn hello_world() -> impl Responder {
    HttpResponse::Ok().json("Hello World 👋")
}
 
#[actix_web::main]
async fn main() -> std::io::Result<()> {
    env_logger::init_from_env(Env::default().default_filter_or("info"));
    dotenv().ok();
 
    HttpServer::new(move || {
        App::new()
            .wrap(Logger::default())
            .service(hello_world)
    })
    .bind(("127.0.0.1", 8080))
    .expect("server must be created")
    .run()
    .await
}

Setup the database

Since all this story is a base I will use in my own projects, I will setup a Postgres database, but you can of course use any databases you want.

You can easily start a new database using a docker compose file. To follow my example, you can use this one:

docker-compose.yaml
services:
  db:
    image: postgres:latest
    environment:
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: admin
      POSTGRES_DB: postgres
    ports:
      - "5432:5432"

We will use sqlx to query this database. I love this library because it allows you to write raw SQL queries, but still obtain typesafe structs (either anonymous our custom one). To achieve this, sqlx implement some macros that needs access to your database. The best way to configure it is to use a .env file with your database url:

.env
DATABASE_URL="postgres://postgres:admin@localhost/postgres"

It is now time to setup the SQL database pool of connections. Happily, sqlx comes with a Pool implementation!

src/main.rs
#[actix_web::main]
async fn main() -> std::io::Result<()> {
    env_logger::init_from_env(Env::default().default_filter_or("info"));
    dotenv().ok(); 
 
    let pool = Pool::<Postgres>::connect(
        std::env::var("DATABASE_URL") // We can use the same env variable than sqlx macros !
            .expect("Missing DATABASE_URL env variable")
            .as_str(),
    )
 
    println!(
        "Wow! We can make SQL queries now: {:?}",
        query!("SELECT * FROM test").fetch_all(&pool).await.unwrap()
    );
 
    //...
}

Great! We can now make queries against our database! We just have to share this pool with our request handlers.

But in Rust, sharing state across threads is not that simple. Luckily for us, actix-web has a mechanism to share data with the entire application.

struct AppContext {
    pool: Pool<Postgres>,
}
 
#[actix_web::get("/hello_world")]
async fn hello_world(ctx: web::Data<AppContext>) -> impl Responder {
    println!(
        "Wow! We can make SQL queries now: {:?}",
        query!("SELECT * FROM test").fetch_all(&ctx.pool).await.unwrap()
    );
    HttpResponse::Ok().json("Hello World 👋")
}
 
async fn main() -> std::io::Result<()> {
    //...
 
    let app_context = web::Data::new(AppContext { pool });
 
    HttpServer::new(move || {
        App::new()
            .app_data(app_context.clone())
            .wrap(Logger::default())
            .service(hello_world)
    })
    .bind(("127.0.0.1", 8080))
    .expect("server must be created")
    .run()
    .await
}

We now have a solid base of any classic HTTP application. Most servers out there seems to be implemented this way. But we can go further!

Wrap request into a transaction

The very first problem we want to tackle is that a request can starve the client pool if it makes too many SQL queries.

I hear you, it sounds like if you starve your pool with a single http request, you’ve probably messed up. And I can’t entirely disagree, but you know… sometimes things are what they are.

Anyway, this is a first step towards our final goal. Since a transaction is bound to client, all queries of a request should be handled by the same client.

It doesn’t sound that difficult to achieve this. One could just acquire a client at the beginning of each request handler, and pass it around in argument of functions that needs it.

And you are right.

src/main.rs
#[actix_web::get("/hello_world")]
async fn hello_world(ctx: web::Data<AppContext>) -> impl Responder {
    let mut tx = ctx.pool.begin().await.unwrap();
    query!("INSERT INTO test(name) VALUES ('Valentin')")
        .execute(&mut *tx)
        .await
        .unwrap();
    tx.commit().await.unwrap();
    HttpResponse::Ok()
}

But it’s not fun.

And imagine that we forget to start a transaction ? No, we want this transaction to automatically wrap all requests!

For this, we will implement an actix-web middleware, which will wrap the execution of every requests. This middleware will start a transaction, and the call the next middleware of the chain. Transactions are automatically rolled back when the owner is dropped at the end of the function, which is great! This means we can’t forget to rollback in case of errors. We just have to commit when we are done with the work we were doing.

pub async fn auto_transaction_middleware(
    req: ServiceRequest,
    next: Next<impl MessageBody>,
) -> Result<ServiceResponse<impl MessageBody>, Error> {
    let ctx = req.app_data::<web::Data<AppContext>>().unwrap().get_ref();
    let tr = ctx.pool.begin().await.unwrap();
    req.extensions_mut().insert(tr);
    next.call(req).await
}
 
#[actix_web::get("/hello_world")]
async fn hello_world(req: HttpRequest) -> impl Responder {
    let mut exts = req.extensions_mut();
    let txr= exts.get_mut::<Transaction<'static, Postgres>>().unwrap();
    query!("INSERT INTO test(name) VALUES ('Valentin')")
        .execute(&mut **tr)
        .await
        .unwrap();
    HttpResponse::Ok()
}
 
#[actix_web::main]
async fn main() -> std::io::Result<()> {
    //...
        App::new()
            .app_data(app_context.clone())
            .wrap(Logger::default())
            .wrap(from_fn(auto_transactions::middleware))
            .service(hello_world)
    //...
}

Hey ! You said we had to commit the transaction, otherwise it would be rolled back automatically !

That’s right, let’s try to commit. Oh no! The commit takes ownership of the transaction, which is not possible since it is owned by the request instance!

src/main.rs
#[actix_web::get("/hello_world")]
async fn hello_world(req: HttpRequest) -> impl Responder {
    let mut exts = req.extensions_mut();
    let tr = exts.get_mut::<Transaction<'static, Postgres>>().unwrap();
    query!("INSERT INTO test(name) VALUES ('Valentin')")
        .execute(&mut **tr)
        .await
        .unwrap();
 
    tr.commit();
//  ^^ -------- `*tr` moved due to this method call
//  |
//  | --------- move occurs because `*tr` has type `Transaction<'_, Postgres>`, which does not implement the `Copy` trait
    HttpResponse::Ok()
}

Hold the transaction in a container

To fix this, we have to find a way to move out the transaction from the request instance. Let me introduce you to the beauty of the mem::replace function.

mem::replace allows to replace the value pointed by the mutable reference, and returns the previous one. That’s exactly what we want to do!

Of course, we can’t really swap the transaction by a new one (it would totally defeat our initial goal of database safety). We have to wrap our transaction into a container that will represent the presence, or the absence of a transaction. We could have used an Option for this, since it’s exactly it’s purpose. But let’s make our own enum instead, so that we can now if the transaction has been committed or rolled back (you will thank me after trying to debug why your transaction is gone for the first time).

While we are at it, we can also implement the FromRequest trait of actix-web to ease the retrieval of the transaction in request handlers.

src/main.rs
#[derive(Debug)]
struct DBClient {
    Client(Transaction<'static, Postgres>),
    Committed,
    RolledBack,
}
 
#[derive(Clone, Debug)]
pub struct ClientContainer {
    client: Arc<Mutex<DBClient>>,
}
 
impl FromRequest for ClientContainer {
    type Error = Error;
 
    type Future = Ready<Result<Self, Error>>;
 
    fn from_request(
        req: &actix_web::HttpRequest,
        _payload: &mut actix_web::dev::Payload,
    ) -> Self::Future {
        ready(Ok(req
            .extensions()
            .get::<Self>()
            .expect("the auto-transaction middleware must be mounted")
            .to_owned()))
    }
}
 
pub async fn auto_transaction_middleware(
    req: ServiceRequest,
    next: Next<impl MessageBody>,
) -> Result<ServiceResponse<impl MessageBody>, Error> {
    let ctx = req.app_data::<web::Data<AppContext>>().unwrap().get_ref();
    let tr = ctx.pool.begin().await.unwrap();
    req.extensions_mut().insert(Arc::new(Mutex::new(tr)));
    next.call(req).await
}

Now we can get our client container directly in from request handler parameters (automatically extracted by actix-web using FromRequest), and use mem::replace to swap its content on commit.

src/main.rs
#[actix_web::get("/hello_world")]
async fn hello_world(ClientContainer { client }: ClientContainer) -> impl Responder {
    let mut lock = client.lock().await;
    let db_client = lock.deref_mut();
    if let DBClient::Client(tr) = db_client {
        query!("INSERT INTO test(name) VALUES ('Valentin')")
            .execute(&mut **tr)
            .await
            .unwrap();
 
        if let DBClient::Client(tr) = mem::replace(db_client, DBClient::Committed) {
            tr.commit().await.unwrap()
        } else {
            unreachable!()
        }
        HttpResponse::Ok().body("OK !")
    } else {
        HttpResponse::InternalServerError().body("The transaction has already ended")
    }
}

As you can see, this has the downside of having requiring a lot of boilerplate just to run a query and commit the transaction. We can do better!

The most annoying part is the querying part: the best API would have been a function that takes a closure parameter. Sadly, this is very difficult to achieve due to the fact the client will be passed as a mutable reference. Mixing Futures and lifetime of references in parameters is not achievable (at the time of writing this article, there is a bug in the compiler that disallow it).

I have worked around this by using a macro. It’s not perfect and hurts my feelings, but it works.

Let’s implement this in a separate crate, so that the main file is more readable.

src/auto_transactions.rs
#[derive(Debug)]
pub enum DBError {
    SQLError(#[allow(dead_code)] sqlx::Error),
    ClientCommitted,
    ClientRolledBack,
}
 
#[macro_export]
macro_rules! q2 {
  ($client:ident, $f:ident, $sql:expr $(,$a:expr)*) => {
      async {
          let mut db_client = $client.lock().await;
          let db_client = db_client.deref_mut();
          match db_client {
            $crate::auto_transactions::DBClient::Client(tx) => {
              sqlx::query!($sql, $($a)*).$f(&mut **tx).await.map_err(|err| $crate::auto_transactions::DBError::SQLError(err))
            },
            $crate::auto_transactions::DBClient::Committed => Err($crate::auto_transactions::DBError::ClientCommitted),
            $crate::auto_transactions::DBClient::RolledBack => Err($crate::auto_transactions::DBError::ClientRolledBack),
          }
      }
  };
}
 
pub async fn commit(client: Arc<Mutex<DBClient>>) -> Result<(), DBError> {
    let mut client = client.lock().await;
    let client_ref = client.deref_mut();
 
    match client_ref {
        DBClient::Committed => Ok(()),
        DBClient::RolledBack => Err(DBError::ClientRolledBack),
        DBClient::Client(_) => {
            if let DBClient::Client(tx) = std::mem::replace(client_ref, DBClient::Committed) {
                tx.commit().await.map_err(|err| DBError::SQLError(err))
            } else {
                unreachable!("already checked in the match clause");
            }
        }
    }
}
 
pub async fn rollback(client: Arc<Mutex<DBClient>>) -> Result<(), DBError> {
    let mut client = db.lock().await;
    let client_ref = client.deref_mut();
 
    match client_ref {
        DBClient::RolledBack => Ok(()),
        DBClient::Committed => Err(DBError::ClientCommitted),
        DBClient::Client(_) => {
            if let DBClient::Client(tx) = std::mem::replace(client_ref, DBClient::RolledBack) {
                tx.rollback().await.map_err(|err| DBError::SQLError(err))
            } else {
                unreachable!("already checked in the match clause");
            }
        }
    }
}

Now, implementation of handlers is much more readable !

src/main.rs
    let res = q2!(
        client,
        execute,
        "INSERT INTO test(name) VALUES ('Valentin')"
    )
    .await
    .unwrap();
    println!("{:?}", res);
 
    commit(client).await.unwrap();
    HttpResponse::Ok()

Task Local Variables

Instead, we will use a feature of tokio (the async runtime used by default with actix-web) called Task Local Variables. They are the same thing than Thread Local Variables, but in the context of an async task instead of a thread.

With this feature, you can setup a context that will be usable transparently during the whole task execution, without any parameter drilling!

Of course, this means we will have to make sure accessing this variable is safe. The sqlx API requires a &mut *tx, which means it should be safe to do a mutable borrow:

src/auto-transactions.rs
tokio::task_local! {
  pub static DB: Arc<Mutex<Transaction<'static, Postgres>>>
}
 
#[actix_web::get("/hello_world")]
async fn hello_world(ctx: web::Data<AppContext>) -> impl Responder {
    let tx = ctx.pool.begin().await
    DB.scope(Arc::new(Mutex::new(DBClient::Client(tr))), async move {
        //...
    })
    .await
}

Join our newsletter

Want to hear from us when there's something new?
Sign up and stay up to date!

*By subscribing, you agree with Beehiiv’s Terms of Service and Privacy Policy.