Async Programming in Incan¶
Incan provides full async/await support powered by the Tokio runtime. This guide covers all async features available in Incan.
Async is import-activated
async and await are soft keywords: they become reserved keywords only after importing std.async
(for example import std.async or from std.async.time import sleep).
Coming from Python?
Incan's async model mirrors Python's asyncio — you'll find async def, await, task spawning, and timeouts all
work the same way.
The key difference is under the hood: Incan compiles to Rust and uses Tokio, giving you the familiar Python syntax with Rust-level async performance.
Quick Start¶
from std.async.time import sleep
async def fetch_data() -> str:
await sleep(1.0) # Wait 1 second
return "data"
def main() -> None:
# When async is used, main automatically gets #[tokio::main]
println("Starting...")
Core Concepts¶
Cancellation Vocabulary¶
Async APIs in std.async document cancellation with four contract terms:
| Term | Meaning |
|---|---|
cancel-safe |
Cancelling a pending wait does not consume the value, acquire the resource, or otherwise complete the operation. |
cancel-safe-but-lossy |
Cancelling the wait does not complete the operation, but a value or queue position owned by that wait may be lost. |
not cancel-safe |
Cancelling a pending wait can break the operation's coordination contract or leave other participants waiting. |
durable once spawned |
Work continues after it is spawned unless it finishes or is explicitly aborted; dropping the handle detaches the work and loses the result. |
Async Functions¶
Declare async functions with async def:
from std.async.time import sleep
async def do_work() -> int:
await sleep(0.5)
return 42
Await¶
Use await to wait for an async operation:
import std.async
async def process() -> str:
data = await fetch_data()
result = await transform(data)
return result
await is only valid inside async def bodies and async methods Using await in an ordinary def or sync method is a type error.
Coming from Python?
Incan's await follows the same rules as in Python: it is disallowed outside an async scope.
Time Primitives¶
sleep¶
Pause execution for a duration:
from std.async.time import sleep, sleep_ms
async def demo() -> None:
await sleep(1.5) # Sleep 1.5 seconds
await sleep_ms(500) # Sleep 500 milliseconds
timeout¶
Run an operation with a time limit:
from std.async.time import timeout
async def demo() -> None:
result = await timeout(5.0, slow_operation)
match result:
case Ok(value): println(f"Success: {value}")
case Err(e): println("Operation timed out")
timeout() and timeout_ms() cancel the supplied future when the deadline expires. That is appropriate for ordinary request work where the timed-out operation should stop. If the work must keep running after the deadline path returns, spawn it first and use timeout_join() so the timeout result preserves the live JoinHandle.
timeout_join¶
Wait for spawned work without cancelling it when the deadline expires:
from std.async.task import spawn
from std.async.time import timeout_join, TimeoutJoinOutcome
handle = spawn(write_audit_event(event))
match await timeout_join(1.0, handle):
case TimeoutJoinOutcome.Completed(_): println("audit event written")
case TimeoutJoinOutcome.JoinFailed(err): println(err.message())
case TimeoutJoinOutcome.TimedOut(live_handle):
remember(live_handle)
Use timeout_join() for side-effecting work that must keep running once spawned, such as durable writes, protocol commits, and file flushes. On timeout, the task continues running and TimeoutJoinOutcome.TimedOut(handle) carries the live handle so you can await it later, store it in a task registry, or abort it deliberately. If an outer cancellation boundary cancels the timeout_join() call itself before it returns, the helper-owned handle is dropped and the task is detached unless you arranged another completion path. For spawn_blocking() handles, abort() can only prevent queued work from starting; blocking work that has already started must finish on its own.
Task Spawning¶
spawn¶
Run a task concurrently:
from std.async.task import spawn
from std.async.time import sleep
async def background_work() -> int:
await sleep(2.0)
return 42
# Spawn returns immediately
handle = spawn(background_work)
# Do other work...
println("Working on other things...")
# Wait for the spawned task
result = await handle
println(f"Background task returned: {result}")
Spawned tasks are durable once spawned. Dropping handle detaches the task and loses the result; it does not cancel the task. Use handle.abort() when an async task should be cancelled.
spawn_blocking¶
Run CPU-intensive or blocking code on a dedicated thread:
from std.async.task import spawn_blocking
def heavy_computation() -> int:
result = 0
for i in range(1_000_000):
result = result + i
return result
# Won't block the async runtime
result = await spawn_blocking(heavy_computation)
spawn_blocking() is for bounded blocking work that eventually finishes on its own. Dropping its JoinHandle detaches the work and loses the result, and abort() cannot stop blocking work after it starts running. abort() can only prevent a queued blocking task from starting.
yield_now¶
Cooperatively yield to let other tasks run:
from std.async.task import yield_now
async def cooperative_loop() -> None:
for i in range(10000):
# Do some work...
if i % 100 == 0:
await yield_now() # Let other tasks run
Channels¶
Channels enable safe message passing between concurrent tasks. They're the primary way to communicate between async tasks without shared mutable state.
MPSC Channel (Multi-Producer, Single-Consumer)¶
MPSC stands for Multi-Producer, Single-Consumer:
- Multiple senders can send messages (clone the sender to share it)
- One receiver processes all messages
- Messages arrive in order (FIFO)
When to use MPSC¶
| Use Case | Why MPSC |
|---|---|
| Worker pool → result collector | Many workers, one aggregator |
| Event bus | Multiple event sources, one handler |
| Logging | Multiple tasks → single log writer |
| Request queue | Multiple clients → one server |
Bounded channel (recommended):
from std.async.channel import channel
# Create channel with buffer size 32
tx, rx = channel[str](32)
# Sender - blocks if buffer is full (backpressure)
async def producer() -> None:
await tx.send("hello") # Waits if buffer full
await tx.send("world")
# Receiver - blocks until message available
async def consumer() -> None:
while let Some(msg) = await rx.recv():
println(f"Got: {msg}")
Cancellation contracts:
tx.send(value)is cancel-safe-but-lossy. If it is cancelled while waiting for bounded-channel capacity, the value is not sent and is dropped.tx.reserve()waits for capacity before a value is committed. Use it for critical sends where cancellation must not drop the message value.permit.send(value)is synchronous and either delivers the value or returns it inSendError[T].rx.recv()is cancel-safe. If it is cancelled while waiting, no message is removed from the channel.try_send()andtry_recv()return immediately, so there is no pending wait to cancel.
Reserve capacity before building or moving a critical message:
match await tx.reserve():
Ok(permit) =>
match permit.send("audit-event"):
Ok(_) => println("sent")
Err(err) => println(f"send failed: {err.value}")
Err(err) => println(err.message())
Multiple producers (clone the sender):
from std.async.channel import channel
from std.async.task import spawn
tx, rx = channel[int](100)
# Clone sender for each producer
tx1 = tx.clone()
tx2 = tx.clone()
spawn(async () -> None:
await tx1.send(1)
await tx1.send(2)
)
spawn(async () -> None:
await tx2.send(100)
await tx2.send(200)
)
# Single consumer receives from all producers
async def consume() -> None:
while let Some(n) = await rx.recv():
println(f"Received: {n}")
Coming from Python?
Incan channels are inspired by Rust's tokio::sync::mpsc, not Python's asyncio.Queue. Key differences:
- Sender/Receiver split: You get a
(tx, rx)pair — clonetxfor multiple producers - Ownership semantics: When all senders are dropped, the channel closes and
recv()returnsNone - No shared queue: Unlike
asyncio.Queue, you can't just pass the queue around — you pass senders or the receiver
The Python equivalent would use asyncio.Queue with put()/get(), but lacks the automatic "channel closed" signal
when producers finish.
Bounded vs Unbounded:
| Type | Backpressure | Memory Safety | Use When |
|---|---|---|---|
channel[T](n) |
Yes - send blocks when full | Bounded memory | Production code |
unbounded_channel[T]() |
No - send never blocks | Can grow forever | Prototyping, low-volume |
Unbounded Channel¶
send never blocks, but use with caution — if producers outpace consumers, memory grows without limit:
from std.async.channel import unbounded_channel
# No capacity limit - send always succeeds immediately
tx, rx = unbounded_channel[int]()
# These never block
tx.send(1)
tx.send(2)
tx.send(3)
# Consumer still blocks waiting for messages
match await rx.recv():
case Some(n): println(f"Got: {n}")
case None: println("Channel closed")
When unbounded is OK:
- Low message volume
- Consumers always faster than producers
- Prototyping (switch to bounded before production)
One-Shot Channel¶
A oneshot channel sends exactly one value. After sending, the sender is consumed and can't be used again. This is perfect for "request-response" patterns where you spawn a task and wait for its result.
When to use oneshot vs regular channel:
| Use Case | Channel Type |
|---|---|
| Stream of messages | channel[T] |
| Single result from spawned task | oneshot[T] |
| Producer-consumer queue | channel[T] |
| Async function return value | oneshot[T] |
Basic usage:
from std.async.channel import oneshot
from std.async.task import spawn
tx, rx = oneshot[int]()
spawn(async () -> None:
result = expensive_computation()
tx.send(result) # Consumes sender - can only send once
)
# Wait for the result
match await rx.recv():
case Ok(value): println(f"Got result: {value}")
case Err(e): println("Sender dropped without sending")
The one-shot receiver is cancel-safe: cancelling rx.recv() does not consume the value. tx.send(value) is synchronous, so it either delivers the value or returns it immediately if the receiver is gone.
Common pattern - returning results from spawned tasks:
from std.async.channel import oneshot
from std.async.task import spawn
async def compute_in_background(input: Data) -> Result[Output, ComputeError]:
# Create oneshot for the result
tx, rx = oneshot[Result[Output, ComputeError]]()
# Spawn the computation
spawn(async () -> None:
result = heavy_computation(input)
tx.send(result)
)
# Do other work while computation runs...
do_other_stuff()
# Wait for result
match await rx.recv():
case Ok(result): return result
case Err(_): return Err(ComputeError.TaskFailed)
Key differences from regular channels:
send()consumes the sender (can only call once)- No buffering - it's either empty or has exactly one value
- Lighter weight than MPSC channels
recv()returnsErrif sender was dropped without sending
Synchronization Primitives¶
Incan provides low-level synchronization primitives inspired by Rust's tokio runtime. These give you fine-grained control over shared state and task coordination.
Coming from Python?
Python's asyncio has some equivalents, but Incan's primitives work differently.
| Incan | Python asyncio | Key Difference |
|---|---|---|
Mutex[T] |
asyncio.Lock |
Incan wraps the value; Python protects external data |
RwLock[T] |
(no equivalent) | Allows multiple readers OR single writer |
Semaphore |
asyncio.Semaphore |
Similar behavior |
Barrier |
asyncio.Barrier (3.11+) |
Similar behavior |
The key difference: Incan's Mutex[T] and RwLock[T] wrap a value (like Rust),
while Python's Lock is just a lock you use alongside your data.
Mutex¶
Mutual exclusion — ensures only one task can access the wrapped value at a time.
When to use: Multiple tasks need to both read AND write shared state.
API:
Mutex.new(value)— Create a mutex wrapping a valueawait mutex.lock()— Acquire lock, returns a guardguard.get()— Read the valueguard.set(new_value)— Write a new value- Guard auto-releases when it goes out of scope
from std.async.sync import Mutex
shared_counter = Mutex.new(0)
async def increment() -> None:
guard = await shared_counter.lock() # Blocks until lock acquired
current = guard.get()
guard.set(current + 1)
# Lock released when guard goes out of scope
mutex.lock() is cancel-safe-but-lossy: cancellation before the guard is returned does not acquire the lock, but the waiter loses its place in the fairness queue.
Python comparison
Python uses a lock separate from the data. Incan wraps the data (Mutex[T]), so the lock and value are kept together.
# Python - lock is separate from data
lock = asyncio.Lock()
counter = 0
async def increment():
async with lock:
counter += 1
from std.async.sync import Mutex
# Incan - lock wraps the data
counter = Mutex.new(0)
async def increment() -> None:
guard = await counter.lock()
guard.set(guard.get() + 1)
RwLock¶
Reader-writer lock — allows multiple readers OR a single writer, but not both simultaneously. More efficient than Mutex when reads are frequent.
When to use: Many tasks read, few tasks write (e.g., configuration, caches).
API:
RwLock.new(value)— Create an RwLock wrapping a valueawait rwlock.read()— Acquire read lock (shared with other readers)await rwlock.write()— Acquire write lock (exclusive)guard.get()— Read the valueguard.set(new_value)— Write (only on write guard)
from std.async.sync import RwLock
config = RwLock.new(Config(debug=False))
async def read_config() -> bool:
guard = await config.read() # Multiple readers allowed simultaneously
return guard.get().debug
async def update_config(debug: bool) -> None:
guard = await config.write() # Waits for all readers, then exclusive
guard.set(Config(debug=debug))
rwlock.read() and rwlock.write() are cancel-safe-but-lossy: cancellation before a guard is returned does not acquire the lock, but the waiter loses its place in the fairness queue.
Coming from Python?
Python's asyncio doesn't have RwLock. This is a Rust/systems programming concept.
If you need read-heavy access patterns in Python, you typically just use a Lock for everything.
Semaphore¶
Counting semaphore — limits how many tasks can access a resource concurrently by managing a pool of "permits."
When to use: Connection pools, rate limiting, bounding concurrent operations.
API:
Semaphore.new(n)— Create with n permitsawait sem.acquire()— Wait for and acquire a permit- Permit auto-releases when it goes out of scope
from std.async.sync import Semaphore
# Allow max 3 concurrent connections
connection_limit = Semaphore.new(3)
async def make_request() -> Response:
permit = await connection_limit.acquire() # Blocks if no permits
response = await http_get(url)
# Permit released automatically when permit goes out of scope
return response
sem.acquire() is cancel-safe-but-lossy: cancellation before a permit is returned does not acquire a permit, but the waiter loses its place in the fairness queue.
Python equivalent
asyncio.Semaphore(n) works the same way.
Barrier¶
Synchronization point — makes N tasks wait until all of them reach the barrier before any can proceed.
When to use: Phased algorithms, coordinating startup, map-reduce patterns.
API:
Barrier.new(n)— Create barrier for n tasksawait barrier.wait()— Wait until all n tasks reach this point
from std.async.sync import Barrier
barrier = Barrier.new(3) # Wait for 3 tasks
async def worker(id: int) -> None:
println(f"Worker {id} starting phase 1")
# ... do phase 1 work ...
await barrier.wait() # All 3 must reach here before anyone continues
println(f"Worker {id} starting phase 2") # All start phase 2 together
barrier.wait() is cancellation-aware before release: cancelling a pending wait withdraws that participant from the current generation and frees its slot. Remaining participants still need enough active arrivals to complete the generation, so workflows that allow independent participant cancellation should also define how replacement participants arrive or how the whole phase is abandoned. The returned slot is unique within a completed generation, but cancellation can reuse freed slots, so do not treat it as chronological arrival order.
Python equivalent
asyncio.Barrier(n) (added in Python 3.11) works the same way.
Complete Example¶
See examples/advanced/async_sync.incn for a runnable demo of all four primitives.
Select and Future Race¶
select_timeout¶
Simplified timeout returning Option:
from std.async.select import select_timeout
match await select_timeout(2.0, slow_operation):
case Some(result): println(f"Got: {result}")
case None: println("Timed out, using default")
select_timeout() has the same cancellation contract as timeout(): if the deadline wins, the supplied future is cancelled. Use it only when the timed-out future may be safely abandoned. For spawned work that must continue, use timeout_join() instead.
Future race syntax is planned as first-completion-wins composition. Losing race arms are cancelled, so loser arms must not contain side effects that are required for correctness after their final suspension point. Put required cleanup in cancellation-safe resources, or spawn durable work before the race and use a handle-preserving wait such as timeout_join() when you still need the result.
Runtime Integration¶
Automatic Tokio Main¶
When your program uses async features, the main() function is automatically wrapped with #[tokio::main]:
Tokio is Rust's async runtime: see the Tokio project for a high-level overview.
import std.async
def main() -> None:
# This becomes async main() under the hood
# when async primitives are used
result = await fetch_data()
println(f"Result: {result}")
Generated Dependencies¶
The compiler automatically adds tokio to your Cargo.toml:
[dependencies]
tokio = { version = "1", features = ["rt-multi-thread", "macros", "time", "sync"] }
Best Practices¶
1. Don't Block the Runtime¶
Avoid blocking operations in async code:
from std.async.time import sleep
# BAD: Blocks the async runtime
async def bad() -> None:
std_thread_sleep(1.0) # Don't do this!
# GOOD: Use async sleep
async def good() -> None:
await sleep(1.0)
2. Use spawn_blocking for CPU Work¶
from std.async.task import spawn_blocking
# BAD: CPU work on async runtime
async def bad() -> int:
return heavy_computation()
# GOOD: Offload to blocking pool
async def good() -> int:
return await spawn_blocking(heavy_computation)
3. Use Bounded Channels¶
Prefer bounded channels to prevent memory issues:
from std.async.channel import channel, unbounded_channel
# Prefer this:
tx, rx = channel[Data](100)
# Over this (unbounded can grow forever):
tx, rx = unbounded_channel[Data]()
4. Handle Cancellation Explicitly¶
Dropping a task handle detaches the task and loses its result. It does not cancel the task:
from std.async.task import spawn
from std.async.time import sleep
async def cancellable_work() -> str:
await sleep(10.0)
return "done"
handle = spawn(cancellable_work)
# This detaches the task and loses the result; the task keeps running.
_ = handle
Use handle.abort() when an async task should be cancelled. For blocking work created with spawn_blocking(), abort() only has a chance to prevent execution before the blocking task starts.
Error Handling¶
Timeout Errors¶
from std.async.time import timeout
result = await timeout(1.0, slow_task)
match result:
case Ok(value): process(value)
case Err(e): println(f"Timed out: {e}")
Channel Closed¶
When sending fails because the receiver was dropped, SendError[T] contains the value that couldn't be sent in its .value field:
from std.async.channel import Sender
async def sender(tx: Sender[Message]) -> None:
msg = Message(id=1, content="important data")
match await tx.send(msg):
case Ok(_): println("Sent successfully")
case Err(e):
# Recover the unsent value - it's not lost!
println(f"Channel closed, saving for retry: {e.value:?}")
save_for_later(e.value)
This pattern is important for reliability. If a channel closes unexpectedly, you can recover and handle the data that failed to send.
See Also¶
- Error handling - Concepts:
Result,Option,?,match - Error handling recipes - Patterns and best practices
- Error trait - Stdlib trait reference
- Examples: Async Tasks
- Examples: Channels
- Examples: Synchronization