use std::collections::{HashMap, VecDeque};
use std::sync::{Arc, Mutex, Condvar, atomic::{AtomicBool, AtomicUsize, Ordering}};
use std::thread;
use std::time::{Duration, Instant};
use std::io::{Read, Write};
use std::fs::File;
// ============================================================================
// COMPLETE MINIMAL ASYNC ABSTRACTION - The 5 Core Primitives
// ============================================================================
pub trait SystemIo: Send + Sync + 'static {
type Error: std::error::Error + Send + Sync + 'static;
type File: FileHandle<Error = Self::Error>;
type Mutex<T: Send>: MutexHandle<T, Error = Self::Error>;
type CancellationToken: CancellationHandle;
type Waker: WakerHandle;
// 1. File I/O operations
fn file_open(&self, path: &str) -> Result<Self::File, Self::Error>;
fn file_create(&self, path: &str) -> Result<Self::File, Self::Error>;
// 2. Synchronization primitive
fn mutex_new<T: Send>(&self, value: T) -> Self::Mutex<T>;
// 3. Timing primitive
fn sleep(&self, duration: Duration);
// 4. Cancellation primitive
fn cancellation_token(&self) -> Self::CancellationToken;
// 5. Wakeup primitive - The missing piece!
fn waker(&self) -> Self::Waker;
fn park(&self, waker: &Self::Waker) -> Result<(), Self::Error>;
fn park_timeout(&self, waker: &Self::Waker, timeout: Duration) -> Result<bool, Self::Error>;
}
pub trait FileHandle: Send + 'static {
type Error: std::error::Error + Send + Sync + 'static;
fn read_to_string(&mut self) -> Result<String, Self::Error>;
fn write_all(&mut self, data: &[u8]) -> Result<(), Self::Error>;
}
pub trait MutexHandle<T: Send>: Send + Sync + 'static {
type Error: std::error::Error + Send + Sync + 'static;
type Guard<'a>: std::ops::Deref<Target = T> + std::ops::DerefMut<Target = T> + 'a
where Self: 'a, T: 'a;
fn lock(&self) -> Result<Self::Guard<'_>, Self::Error>;
}
pub trait CancellationHandle: Send + Sync + Clone + 'static {
fn cancel(&self);
fn is_cancelled(&self) -> bool;
}
pub trait WakerHandle: Send + Sync + Clone + 'static {
fn wake(&self);
fn wake_by_ref(&self);
}
// ============================================================================
// BLOCKING IMPLEMENTATION - Using OS Threads + Synchronization
// ============================================================================
#[derive(Clone)]
pub struct BlockingIo;
pub struct BlockingFile(File);
pub struct BlockingMutex<T: Send>(Arc<Mutex<T>>);
pub struct BlockingCancellationToken(Arc<AtomicBool>);
pub struct BlockingWaker {
inner: Arc<(Mutex<bool>, Condvar)>,
id: usize,
}
// Global waker registry for thread parking
lazy_static::lazy_static! {
static ref WAKER_REGISTRY: Mutex<HashMap<thread::ThreadId, Arc<(Mutex<bool>, Condvar)>>> =
Mutex::new(HashMap::new());
static ref WAKER_COUNTER: AtomicUsize = AtomicUsize::new(0);
}
impl SystemIo for BlockingIo {
type Error = std::io::Error;
type File = BlockingFile;
type Mutex<T: Send> = BlockingMutex<T>;
type CancellationToken = BlockingCancellationToken;
type Waker = BlockingWaker;
fn file_open(&self, path: &str) -> Result<Self::File, Self::Error> {
Ok(BlockingFile(File::open(path)?))
}
fn file_create(&self, path: &str) -> Result<Self::File, Self::Error> {
Ok(BlockingFile(File::create(path)?))
}
fn mutex_new<T: Send>(&self, value: T) -> Self::Mutex<T> {
BlockingMutex(Arc::new(Mutex::new(value)))
}
fn sleep(&self, duration: Duration) {
thread::sleep(duration);
}
fn cancellation_token(&self) -> Self::CancellationToken {
BlockingCancellationToken(Arc::new(AtomicBool::new(false)))
}
fn waker(&self) -> Self::Waker {
let id = WAKER_COUNTER.fetch_add(1, Ordering::SeqCst);
BlockingWaker {
inner: Arc::new((Mutex::new(false), Condvar::new())),
id,
}
}
fn park(&self, waker: &Self::Waker) -> Result<(), Self::Error> {
let (lock, cvar) = &*waker.inner;
let mut woken = lock.lock().unwrap();
while !*woken {
woken = cvar.wait(woken).unwrap();
}
*woken = false; // Reset for next park
Ok(())
}
fn park_timeout(&self, waker: &Self::Waker, timeout: Duration) -> Result<bool, Self::Error> {
let (lock, cvar) = &*waker.inner;
let mut woken = lock.lock().unwrap();
if !*woken {
let result = cvar.wait_timeout(woken, timeout).unwrap();
woken = result.0;
if result.1.timed_out() {
return Ok(false); // Timed out
}
}
*woken = false; // Reset for next park
Ok(true) // Woken up
}
}
impl FileHandle for BlockingFile {
type Error = std::io::Error;
fn read_to_string(&mut self) -> Result<String, Self::Error> {
let mut content = String::new();
use std::io::Read;
self.0.read_to_string(&mut content)?;
Ok(content)
}
fn write_all(&mut self, data: &[u8]) -> Result<(), Self::Error> {
self.0.write_all(data)
}
}
impl<T: Send> MutexHandle<T> for BlockingMutex<T> {
type Error = std::io::Error;
type Guard<'a> = std::sync::MutexGuard<'a, T> where Self: 'a, T: 'a;
fn lock(&self) -> Result<Self::Guard<'_>, Self::Error> {
self.0.lock().map_err(|_| {
std::io::Error::new(std::io::ErrorKind::Other, "Mutex poisoned")
})
}
}
impl CancellationHandle for BlockingCancellationToken {
fn cancel(&self) {
self.0.store(true, Ordering::SeqCst);
}
fn is_cancelled(&self) -> bool {
self.0.load(Ordering::SeqCst)
}
}
impl Clone for BlockingCancellationToken {
fn clone(&self) -> Self {
BlockingCancellationToken(Arc::clone(&self.0))
}
}
impl WakerHandle for BlockingWaker {
fn wake(&self) {
let (lock, cvar) = &*self.inner;
let mut woken = lock.lock().unwrap();
*woken = true;
cvar.notify_one();
}
fn wake_by_ref(&self) {
self.wake();
}
}
impl Clone for BlockingWaker {
fn clone(&self) -> Self {
Self {
inner: Arc::clone(&self.inner),
id: self.id,
}
}
}
// ============================================================================
// HIGHER-LEVEL CONSTRUCTS - Built from the 5 Primitives
// ============================================================================
// Channel = Queue + Waker
pub struct Channel<T> {
queue: Arc<Mutex<VecDeque<T>>>,
waker: Arc<Mutex<Option<BlockingWaker>>>,
capacity: usize,
}
impl<T: Send + 'static> Channel<T> {
pub fn new(capacity: usize) -> (ChannelSender<T>, ChannelReceiver<T>) {
let shared = Arc::new(Channel {
queue: Arc::new(Mutex::new(VecDeque::new())),
waker: Arc::new(Mutex::new(None)),
capacity,
});
(
ChannelSender { inner: Arc::clone(&shared) },
ChannelReceiver { inner: shared },
)
}
}
pub struct ChannelSender<T> {
inner: Arc<Channel<T>>,
}
pub struct ChannelReceiver<T> {
inner: Arc<Channel<T>>,
}
impl<T> ChannelSender<T> {
pub fn send(&self, value: T) -> Result<(), &'static str> {
let mut queue = self.inner.queue.lock().unwrap();
if queue.len() >= self.inner.capacity {
return Err("Channel full");
}
queue.push_back(value);
// Wake up any waiting receiver
if let Some(waker) = self.inner.waker.lock().unwrap().as_ref() {
waker.wake();
}
Ok(())
}
}
impl<T> ChannelReceiver<T> {
pub fn recv<I: SystemIo<Waker = BlockingWaker>>(&self, io: &I) -> Result<T, I::Error> {
loop {
// Try to get item
{
let mut queue = self.inner.queue.lock().unwrap();
if let Some(item) = queue.pop_front() {
return Ok(item);
}
}
// No item available, park until woken
let waker = io.waker();
*self.inner.waker.lock().unwrap() = Some(waker.clone());
io.park(&waker)?;
}
}
}
// Semaphore = Counter + Waker
pub struct Semaphore {
permits: Arc<Mutex<usize>>,
waiters: Arc<Mutex<VecDeque<BlockingWaker>>>,
}
impl Semaphore {
pub fn new(permits: usize) -> Self {
Self {
permits: Arc::new(Mutex::new(permits)),
waiters: Arc::new(Mutex::new(VecDeque::new())),
}
}
pub fn acquire<I: SystemIo<Waker = BlockingWaker>>(&self, io: &I) -> Result<SemaphoreGuard, I::Error> {
loop {
// Try to acquire permit
{
let mut permits = self.permits.lock().unwrap();
if *permits > 0 {
*permits -= 1;
return Ok(SemaphoreGuard {
semaphore: self,
_phantom: std::marker::PhantomData,
});
}
}
// No permits available, park until one is released
let waker = io.waker();
self.waiters.lock().unwrap().push_back(waker.clone());
io.park(&waker)?;
}
}
fn release(&self) {
let mut permits = self.permits.lock().unwrap();
*permits += 1;
// Wake up one waiter
if let Some(waker) = self.waiters.lock().unwrap().pop_front() {
waker.wake();
}
}
}
pub struct SemaphoreGuard<'a> {
semaphore: &'a Semaphore,
_phantom: std::marker::PhantomData<&'a ()>,
}
impl<'a> Drop for SemaphoreGuard<'a> {
fn drop(&mut self) {
self.semaphore.release();
}
}
// ============================================================================
// ENHANCED EXECUTOR - With All Primitives
// ============================================================================
pub struct Task<T> {
handle: thread::JoinHandle<T>,
cancel_token: BlockingCancellationToken,
}
impl<T> Task<T> {
pub fn join(self) -> Result<T, Box<dyn std::error::Error + Send + Sync>> {
self.handle.join().map_err(|_| "Task panicked".into())
}
pub fn cancel(&self) {
self.cancel_token.cancel();
}
pub fn is_cancelled(&self) -> bool {
self.cancel_token.is_cancelled()
}
}
pub struct Executor<I: SystemIo> {
io: Arc<I>,
}
impl<I: SystemIo> Executor<I> {
pub fn new(io: I) -> Self {
Self {
io: Arc::new(io),
}
}
pub fn spawn<F, T>(&self, task: F) -> Task<T>
where
F: FnOnce(&I, &I::CancellationToken) -> T + Send + 'static,
T: Send + 'static,
{
let io = Arc::clone(&self.io);
let cancel_token = self.io.cancellation_token();
let cancel_clone = cancel_token.clone();
let handle = thread::spawn(move || {
task(&*io, &cancel_clone)
});
Task { handle, cancel_token }
}
// Timeout = Sleep + Select + Cancellation
pub fn timeout<F, T>(&self, duration: Duration, task: F) -> Result<T, TimeoutError>
where
F: FnOnce(&I, &I::CancellationToken) -> T + Send + 'static,
T: Send + 'static,
{
let main_task = self.spawn(task);
let timeout_task = self.spawn(move |io, _cancel| {
io.sleep(duration);
TimeoutError
});
match self.select2(main_task, timeout_task) {
Either::Left(Ok(result)) => Ok(result),
Either::Left(Err(_)) => Err(TimeoutError),
Either::Right(_) => Err(TimeoutError),
}
}
pub fn join_all<T>(&self, tasks: Vec<Task<T>>) -> Vec<Result<T, Box<dyn std::error::Error + Send + Sync>>> {
tasks.into_iter().map(|t| t.join()).collect()
}
pub fn select2<A, B>(&self, task_a: Task<A>, task_b: Task<B>) -> Either<Result<A, Box<dyn std::error::Error + Send + Sync>>, Result<B, Box<dyn std::error::Error + Send + Sync>>> {
let (sender, receiver) = std::sync::mpsc::channel();
let sender_a = sender.clone();
let handle_a = thread::spawn(move || {
let result = task_a.join();
let _ = sender_a.send(Either::Left(result));
});
let sender_b = sender;
let handle_b = thread::spawn(move || {
let result = task_b.join();
let _ = sender_b.send(Either::Right(result));
});
let result = receiver.recv().unwrap();
// Cancel the losing task
match &result {
Either::Left(_) => task_b.cancel(),
Either::Right(_) => task_a.cancel(),
}
// Clean up threads
let _ = handle_a.join();
let _ = handle_b.join();
result
}
}
#[derive(Debug)]
pub enum Either<A, B> {
Left(A),
Right(B),
}
#[derive(Debug)]
pub struct TimeoutError;
impl std::fmt::Display for TimeoutError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Operation timed out")
}
}
impl std::error::Error for TimeoutError {}
// ============================================================================
// COLORLESS USER CODE - Complete Examples
// ============================================================================
pub fn producer_consumer_example<I: SystemIo<Waker = BlockingWaker>>(
executor: &Executor<I>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>>
where
I::Error: 'static,
{
println!("=== Producer/Consumer with Waker Example ===");
let (sender, receiver) = Channel::new(5);
let sender = Arc::new(sender);
let receiver = Arc::new(receiver);
// Producer task
let producer_sender = Arc::clone(&sender);
let producer = executor.spawn(move |io, cancel| -> Result<(), I::Error> {
for i in 0..10 {
if cancel.is_cancelled() {
println!("Producer cancelled at item {}", i);
return Ok(());
}
match producer_sender.send(format!("Item {}", i)) {
Ok(()) => println!("Produced: Item {}", i),
Err(e) => println!("Producer error: {}", e),
}
io.sleep(Duration::from_millis(100));
}
println!("Producer finished");
Ok(())
});
// Consumer task
let consumer_receiver = Arc::clone(&receiver);
let consumer = executor.spawn(move |io, cancel| -> Result<Vec<String>, I::Error> {
let mut consumed = Vec::new();
for _ in 0..10 {
if cancel.is_cancelled() {
println!("Consumer cancelled");
break;
}
match consumer_receiver.recv(io) {
Ok(item) => {
println!("Consumed: {}", item);
consumed.push(item);
}
Err(e) => {
println!("Consumer error: {:?}", e);
break;
}
}
}
println!("Consumer finished");
Ok(consumed)
});
let results = executor.join_all(vec![
producer,
consumer.map(|r| r.map(|items| format!("Consumed {} items", items.len()))),
]);
for (i, result) in results.iter().enumerate() {
match result {
Ok(msg) => println!("Task {} result: {:?}", i, msg),
Err(e) => println!("Task {} error: {}", i, e),
}
}
Ok(())
}
pub fn semaphore_example<I: SystemIo<Waker = BlockingWaker>>(
executor: &Executor<I>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>>
where
I::Error: 'static,
{
println!("\n=== Semaphore Example ===");
let semaphore = Arc::new(Semaphore::new(3)); // Only 3 concurrent operations
let mut tasks = Vec::new();
for task_id in 0..8 {
let sem = Arc::clone(&semaphore);
let task = executor.spawn(move |io, cancel| -> Result<String, I::Error> {
if cancel.is_cancelled() {
return Ok(format!("Task {} cancelled before start", task_id));
}
println!("Task {} waiting for semaphore...", task_id);
let _guard = sem.acquire(io)?;
if cancel.is_cancelled() {
return Ok(format!("Task {} cancelled after acquire", task_id));
}
println!("Task {} acquired semaphore, working...", task_id);
io.sleep(Duration::from_millis(200));
println!("Task {} finished work", task_id);
Ok(format!("Task {} completed", task_id))
});
tasks.push(task);
}
let results = executor.join_all(tasks);
for result in results {
match result {
Ok(Ok(msg)) => println!("✅ {}", msg),
Ok(Err(e)) => println!("❌ Task error: {:?}", e),
Err(e) => println!("❌ Task panic: {}", e),
}
}
Ok(())
}
pub fn timeout_example<I: SystemIo<Waker = BlockingWaker>>(
executor: &Executor<I>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>>
where
I::Error: 'static,
{
println!("\n=== Timeout Example ===");
// Fast task - should complete
match executor.timeout(Duration::from_millis(200), |io, cancel| {
println!("Fast task starting...");
io.sleep(Duration::from_millis(100));
if !cancel.is_cancelled() {
println!("Fast task completed!");
"Fast task result"
} else {
"Fast task cancelled"
}
}) {
Ok(result) => println!("✅ Fast task: {}", result),
Err(_) => println!("❌ Fast task timed out"),
}
// Slow task - should timeout
match executor.timeout(Duration::from_millis(200), |io, cancel| {
println!("Slow task starting...");
io.sleep(Duration::from_millis(500));
if !cancel.is_cancelled() {
println!("Slow task completed!");
"Slow task result"
} else {
println!("Slow task was cancelled");
"Slow task cancelled"
}
}) {
Ok(result) => println!("✅ Slow task: {}", result),
Err(_) => println!("❌ Slow task timed out (expected)"),
}
Ok(())
}
// ============================================================================
// DEMO MAIN
// ============================================================================
fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
println!("🚀 Complete Colorless Rust Async - The 5 Primitives\n");
println!("Core primitives: Mutex, Sleep, Select, Cancellation, Wakeup\n");
let blocking_io = BlockingIo;
let executor = Executor::new(blocking_io);
// Run all examples
producer_consumer_example(&executor)?;
semaphore_example(&executor)?;
timeout_example(&executor)?;
println!("\n✅ All examples completed!");
println!("\n🎯 This demonstrates the complete minimal async kernel:");
println!(" 1. Mutex (synchronization)");
println!(" 2. Sleep (timing)");
println!(" 3. Select (racing)");
println!(" 4. Cancellation (cleanup)");
println!(" 5. Wakeup (efficient waiting)");
println!("\nEverything else (channels, semaphores, timeouts) builds from these! 🎉");
Ok(())
}
/*
FINAL ANALYSIS - Complete Minimal Async Kernel:
✅ THE 5 CORE PRIMITIVES:
1. Mutex - Synchronization between tasks
2. Sleep - Time-based operations
3. Select - Racing multiple operations
4. Cancellation - Cleanup and early termination
5. Wakeup - Efficient task coordination
✅ HIGHER-LEVEL CONSTRUCTS BUILT FROM PRIMITIVES:
- Channels = Queue + Waker
- Semaphores = Counter + Waker
- Timeouts = Sleep + Select + Cancellation
- Any other async pattern!
✅ TRUE COLORLESS ASYNC:
- Same user code works with blocking or async backends
- Blocking backend uses threads + OS synchronization
- Async backend would use event loops + cooperative scheduling
- Identical semantics, different performance characteristics
✅ PRODUCTION-READY FEATURES:
- Proper cancellation with cooperative points
- Backpressure via semaphores
- Producer/consumer patterns via channels
- Timeout handling for reliability
- Resource cleanup via RAII guards
🚀 THIS IS THE FOUNDATION FOR COLORLESS ASYNC IN RUST!
Next step: Implement the AsyncIo backend using mio/epoll to prove
the same user code works with both execution models.
*/