Skip to content

Chapter 14 - Advanced Walker Patterns

Chapter 14: Advanced Language Features#

This chapter explores Jac's advanced features that enable sophisticated concurrent programming, leverage its powerful type system, and provide robust error handling for production systems. These features build upon the fundamentals to create applications that are both powerful and maintainable.

14.1 Concurrent Programming#

spawn for Parallel Walkers#

The spawn operator not only activates walkers but also enables natural concurrency. Multiple walkers can traverse the graph simultaneously, with Jac handling synchronization automatically:

// Basic parallel walker spawning
walker DataProcessor {
    has node_id: str;
    has processing_time: float;

    can process with entry {
        let start = time.now();

        // Simulate processing
        sleep(self.processing_time);
        process_node_data(here);

        let duration = time.now() - start;
        report {
            "node": self.node_id,
            "duration": duration,
            "thread": threading.current_thread().name
        };
    }
}

// Spawn multiple walkers concurrently
with entry {
    let nodes = root[-->:DataNode:];

    // These walkers execute in parallel!
    for i, node in enumerate(nodes) {
        spawn DataProcessor(
            node_id=f"node_{i}",
            processing_time=random.uniform(0.1, 0.5)
        ) on node;
    }
}

Advanced Spawning Patterns#

// Parallel map-reduce pattern
walker MapWorker {
    has mapper: callable;
    has data_chunk: list;
    has result_collector: node;

    can map with entry {
        // Process data chunk in parallel
        let results = [self.mapper(item) for item in self.data_chunk];

        // Send results to collector (thread-safe)
        visit self.result_collector {
            :atomic: {  // Atomic operation
                here.results.extend(results);
                here.completed_chunks += 1;
            }
        };
    }
}

walker ReduceWorker {
    has reducer: callable;
    has expected_chunks: int;

    can reduce with ResultCollector entry {
        // Wait for all map workers to complete
        while here.completed_chunks < self.expected_chunks {
            sleep(0.01);  // Busy wait (in practice, use conditions)
        }

        // Reduce results
        final_result = self.reducer(here.results);
        report final_result;
    }
}

// Usage
can parallel_map_reduce(
    data: list,
    mapper: callable,
    reducer: callable,
    chunk_size: int = 100
) -> any {
    // Create result collector
    collector = root ++> ResultCollector(
        results=[],
        completed_chunks=0
    );

    // Split data and spawn mappers
    chunks = [data[i:i+chunk_size] for i in range(0, len(data), chunk_size)];

    for chunk in chunks {
        spawn MapWorker(
            mapper=mapper,
            data_chunk=chunk,
            result_collector=collector
        ) on root;
    }

    // Spawn reducer
    result = spawn ReduceWorker(
        reducer=reducer,
        expected_chunks=len(chunks)
    ) on collector;

    return result;
}

Walker Synchronization#

Jac provides several mechanisms for coordinating concurrent walkers:

// Barrier synchronization
node BarrierNode {
    has required_count: int;
    has arrived_count: int = 0;
    has waiting_walkers: list = [];

    can wait_at_barrier with visitor entry {
        :synchronized: {  // Synchronized block
            self.arrived_count += 1;

            if self.arrived_count < self.required_count {
                // Add walker to waiting list
                self.waiting_walkers.append(visitor);
                visitor.suspend();  // Suspend walker execution
            } else {
                // All walkers arrived, release them
                for walker in self.waiting_walkers {
                    walker.resume();  // Resume suspended walkers
                }
                self.waiting_walkers.clear();
                self.arrived_count = 0;
            }
        }
    }
}

// Pipeline pattern with synchronized stages
walker PipelineStage {
    has stage_num: int;
    has process_func: callable;
    has next_stage: node?;

    can process with WorkItem entry {
        // Process work item
        result = self.process_func(here.data);
        here.data = result;

        // Mark stage completion
        here.completed_stages.add(self.stage_num);

        // Move to next stage if available
        if self.next_stage {
            :synchronized: {
                # Ensure ordering for pipeline
                visit self.next_stage;
            }
        } else {
            report here.data;  // Final result
        }
    }
}

async/await Patterns#

Jac fully supports asynchronous programming for I/O-bound operations:

import:py asyncio;
import:py aiohttp;
import:py from asyncio { gather, create_task }

// Async walker abilities
walker AsyncWebCrawler {
    has urls: list[str];
    has max_concurrent: int = 10;
    has results: dict = {};

    async can crawl with entry {
        // Create semaphore for rate limiting
        semaphore = asyncio.Semaphore(self.max_concurrent);

        // Create tasks for all URLs
        tasks = [
            create_task(self.fetch_url(url, semaphore))
            for url in self.urls
        ];

        // Wait for all to complete
        await gather(*tasks);

        report self.results;
    }

    async can fetch_url(url: str, semaphore: asyncio.Semaphore) -> None {
        async with semaphore {  // Limit concurrent requests
            try {
                async with aiohttp.ClientSession() as session {
                    async with session.get(url) as response {
                        content = await response.text();
                        self.results[url] = {
                            "status": response.status,
                            "length": len(content),
                            "title": extract_title(content)
                        };
                    }
                }
            } except Exception as e {
                self.results[url] = {"error": str(e)};
            }
        }
    }
}

// Async node abilities
node AsyncDataSource {
    has api_endpoint: str;
    has cache: dict = {};
    has cache_ttl: int = 300;  // seconds

    async can fetch_data with AsyncClient entry {
        cache_key = visitor.query_params;

        // Check cache
        if cache_key in self.cache {
            cached_data, timestamp = self.cache[cache_key];
            if time.now() - timestamp < self.cache_ttl {
                visitor.receive_data(cached_data);
                return;
            }
        }

        // Fetch fresh data
        try {
            data = await fetch_from_api(
                self.api_endpoint,
                visitor.query_params
            );

            // Update cache
            self.cache[cache_key] = (data, time.now());
            visitor.receive_data(data);

        } except APIError as e {
            visitor.receive_error(e);
        }
    }
}

// Async entry point
async with entry:main {
    // Spawn async walker
    crawler = AsyncWebCrawler(urls=[
        "https://example.com",
        "https://example.org",
        "https://example.net"
    ]);

    results = await spawn crawler on root;
    print(f"Crawled {len(results)} URLs");
}

Thread-Safe Graph Operations#

When multiple walkers operate on the same graph regions, Jac provides thread-safety guarantees:

// Thread-safe node with fine-grained locking
node ConcurrentCounter {
    has count: int = 0;
    has :lock: lock = threading.Lock();  // Node-level lock

    can increment with visitor entry {
        with self.lock {
            old_value = self.count;
            self.count += 1;

            // Log the change atomically
            self ++> CounterLog(
                timestamp=now(),
                old_value=old_value,
                new_value=self.count,
                walker_id=visitor.id
            );
        }
    }

    can get_value -> int {
        with self.lock {
            return self.count;
        }
    }
}

// Read-write lock pattern
node SharedResource {
    has data: dict = {};
    has :rwlock: rwlock = threading.RWLock();

    can read_data(key: str) -> any? {
        with self.rwlock.read() {  // Multiple readers allowed
            return self.data.get(key);
        }
    }

    can write_data(key: str, value: any) {
        with self.rwlock.write() {  // Exclusive write access
            self.data[key] = value;
        }
    }
}

// Atomic operations on edges
edge ConcurrentEdge {
    has weight: float;
    has access_count: int = 0;

    :atomic: ["weight", "access_count"];  // Declare atomic fields

    can traverse with visitor entry {
        # These operations are atomic
        self.access_count += 1;
        self.weight *= 0.99;  # Decay weight

        if self.weight < 0.1 {
            # Mark for deletion (thread-safe)
            self.mark_for_deletion();
        }
    }
}

14.2 Type System Deep Dive#

Type Inference vs Explicit Typing#

While Jac requires type annotations, it provides sophisticated type inference in many contexts:

// Explicit typing (required for declarations)
let numbers: list[int] = [1, 2, 3, 4, 5];
let processor: DataProcessor = DataProcessor();

// Type inference in expressions
let doubled = numbers.map(lambda x: int -> int : x * 2);  // Inferred: list[int]
let filtered = doubled.filter(lambda x: int -> bool : x > 5);  // Inferred: list[int]

// Generic type inference
can identity[T](value: T) -> T {
    return value;
}

let x = identity(42);  // T inferred as int
let y = identity("hello");  // T inferred as str

// Complex type inference
can process_data[T, R](
    data: list[T],
    transformer: callable[[T], R]
) -> list[R] {
    return [transformer(item) for item in data];
}

// Usage with inference
let strings = ["1", "2", "3"];
let integers = process_data(strings, int);  // Inferred: list[int]

Generic Types and Constraints#

Jac supports sophisticated generic programming with type constraints:

// Basic generics
obj Container[T] {
    has items: list[T] = [];

    can add(item: T) {
        self.items.append(item);
    }

    can get(index: int) -> T? {
        if 0 <= index < len(self.items) {
            return self.items[index];
        }
        return None;
    }
}

// Generic constraints
can sort_comparable[T: Comparable](items: list[T]) -> list[T] {
    return sorted(items);
}

// Multiple type parameters with constraints
obj Cache[K: Hashable, V] {
    has store: dict[K, tuple[V, float]] = {};
    has ttl: float;

    can set(key: K, value: V) {
        self.store[key] = (value, time.now());
    }

    can get(key: K) -> V? {
        if key in self.store {
            value, timestamp = self.store[key];
            if time.now() - timestamp < self.ttl {
                return value;
            }
            del self.store[key];
        }
        return None;
    }
}

// Bounded generics
walker TypedTraverser[N: node, E: edge] {
    has node_filter: callable[[N], bool];
    has edge_filter: callable[[E], bool];

    can traverse with N entry {
        if self.node_filter(here) {
            // Process node
            process_typed_node(here);

            // Traverse filtered edges
            let valid_edges = [e for e in [<-->]
                              if isinstance(e, E) and self.edge_filter(e)];

            for edge in valid_edges {
                visit edge.target;
            }
        }
    }
}

Type-Safe Graph Operations#

Jac's type system extends to graph operations, ensuring type safety in topological programming:

// Typed node references
node TypedNode {
    has data: str;
}

node SpecialNode(TypedNode) {
    has special_data: int;
}

// Type-safe traversal
walker StrictTraverser {
    can process with entry {
        // Type-checked at compile time
        let typed_nodes: list[TypedNode] = [-->(`TypedNode)];
        let special_nodes: list[SpecialNode] = [-->(`SpecialNode)];

        // This would be a compile error:
        // let wrong: list[SpecialNode] = [-->(`TypedNode)];
    }
}

// Generic graph algorithms
can find_path[N: node](
    start: N,
    end: N,
    filter_func: callable[[N], bool] = lambda n: N -> bool : True
) -> list[N]? {

    walker PathFinder[N] {
        has target: N;
        has filter_func: callable[[N], bool];
        has path: list[N] = [];
        has found: bool = False;

        can search with N entry {
            self.path.append(here);

            if here == self.target {
                self.found = True;
                report self.path;
                disengage;
            }

            let next_nodes: list[N] = [-->(`N)]
                .filter(self.filter_func)
                .filter(lambda n: N -> bool : n not in self.path);

            for next in next_nodes {
                visit next;
                if self.found {
                    disengage;
                }
            }

            self.path.pop();
        }
    }

    let finder = PathFinder[N](
        target=end,
        filter_func=filter_func
    );

    return spawn finder on start;
}

Advanced Type Features#

// Union types
type StringOrInt = str | int;
type MaybeNode = node | None;

can process_mixed(value: StringOrInt) -> str {
    match value {
        case str as s: return s;
        case int as i: return str(i);
    }
}

// Type aliases for complex types
type UserGraph = dict[str, list[tuple[User, Relationship]]];
type AsyncCallback = callable[[any], Awaitable[None]];

// Literal types
type Direction = "north" | "south" | "east" | "west";
type Priority = 1 | 2 | 3 | 4 | 5;

can move(direction: Direction, steps: int) -> Position {
    match direction {
        case "north": return Position(0, steps);
        case "south": return Position(0, -steps);
        case "east": return Position(steps, 0);
        case "west": return Position(-steps, 0);
    }
}

// Protocol types (structural typing)
protocol Serializable {
    can to_json() -> str;
    can from_json(data: str) -> Self;
}

can save_object[T: Serializable](obj: T, filename: str) {
    with open(filename, "w") as f {
        f.write(obj.to_json());
    }
}

// Variadic generics
can combine[*Ts](values: tuple[*Ts]) -> tuple[*Ts] {
    return values;
}

let combined = combine((1, "hello", 3.14, True));  // tuple[int, str, float, bool]

14.3 Error Handling#

Exception Handling in Traversals#

Error handling in graph traversal requires special consideration:

// Traversal-aware exception handling
walker ResilientTraverser {
    has errors: list[dict] = [];
    has continue_on_error: bool = True;

    can traverse with entry {
        try {
            // Process current node
            result = process_node(here);

            // Continue traversal
            visit [-->];

        } except NodeProcessingError as e {
            self.errors.append({
                "node": here,
                "error": str(e),
                "traceback": get_traceback()
            });

            if not self.continue_on_error {
                disengage;  // Stop traversal
            }

        } except NetworkError as e {
            // Handle cross-machine traversal errors
            print(f"Network error visiting remote node: {e}");

            // Try alternate path
            visit [-->:LocalEdge:];  // Only local edges
        }
    }

    can summarize with `root exit {
        if self.errors {
            report {
                "status": "completed_with_errors",
                "error_count": len(self.errors),
                "errors": self.errors
            };
        } else {
            report {"status": "success"};
        }
    }
}

Ability-Specific Error Patterns#

Different ability types require different error handling strategies:

// Node ability error handling
node DataNode {
    has data: dict;
    has error_count: int = 0;
    has max_errors: int = 3;

    can process_visitor with DataWalker entry {
        try {
            # Validate visitor
            if not visitor.is_authorized() {
                raise UnauthorizedError("Visitor not authorized");
            }

            # Process data
            result = transform_data(self.data, visitor.transform_spec);
            visitor.receive_result(result);

        } except UnauthorizedError as e {
            # Specific handling for auth errors
            log_security_event(visitor, e);
            visitor.reject(reason=str(e));

        } except DataTransformError as e {
            # Handle processing errors
            self.error_count += 1;

            if self.error_count >= self.max_errors {
                self.mark_as_failed();
                raise NodeFailureError(f"Node failed after {self.error_count} errors");
            }

            visitor.receive_error(e);

        } finally {
            # Always log visit
            self ++> VisitLog(
                visitor_type=type(visitor).__name__,
                timestamp=now(),
                success=not visitor.has_error()
            );
        }
    }
}

// Walker ability error handling
walker DataMigrator {
    has source_version: int;
    has target_version: int;
    has rollback_on_error: bool = True;
    has migrated_nodes: list[node] = [];

    can migrate with entry {
        # Create savepoint for rollback
        savepoint = create_graph_savepoint();

        try {
            # Check version compatibility
            if here.version != self.source_version {
                raise VersionMismatchError(
                    f"Expected v{self.source_version}, found v{here.version}"
                );
            }

            # Perform migration
            migrate_node_data(here, self.target_version);
            here.version = self.target_version;
            self.migrated_nodes.append(here);

            # Continue to connected nodes
            visit [-->];

        } except MigrationError as e {
            print(f"Migration failed at {here}: {e}");

            if self.rollback_on_error {
                # Rollback all migrated nodes
                restore_graph_savepoint(savepoint);

                # Report failure
                report {
                    "status": "failed",
                    "error": str(e),
                    "rolled_back": True,
                    "attempted_nodes": len(self.migrated_nodes)
                };

                disengage;
            } else {
                # Continue despite error
                self.errors.append({
                    "node": here,
                    "error": e
                });
            }
        }
    }
}

Distributed Error Propagation#

Handling errors across machine boundaries requires special consideration:

// Cross-machine error handling
walker DistributedProcessor {
    has timeout: float = 30.0;
    has retry_attempts: int = 3;

    can process with entry {
        for attempt in range(self.retry_attempts) {
            try {
                # Set timeout for cross-machine operations
                with timeout_context(self.timeout) {
                    # This might cross machine boundaries
                    remote_result = visit_and_get_result(
                        [-->:RemoteEdge:][0]
                    );

                    process_remote_result(remote_result);
                    break;  # Success, exit retry loop
                }

            } except TimeoutError as e {
                if attempt < self.retry_attempts - 1 {
                    # Exponential backoff
                    wait_time = 2 ** attempt;
                    print(f"Timeout, retrying in {wait_time}s...");
                    sleep(wait_time);
                } else {
                    # Final attempt failed
                    raise DistributedOperationError(
                        f"Operation timed out after {self.retry_attempts} attempts"
                    );

            } except RemoteMachineError as e {
                # Remote machine failure
                handle_machine_failure(e.machine_id);

                # Try alternate route
                alternate = find_alternate_route(here, e.failed_node);
                if alternate {
                    visit alternate;
                } else {
                    raise NoAlternateRouteError();
            }
        }
    }
}

// Circuit breaker pattern for distributed calls
obj CircuitBreaker {
    has failure_threshold: int = 5;
    has recovery_timeout: float = 60.0;
    has failure_count: int = 0;
    has last_failure_time: float = 0.0;
    has state: str = "closed";  # closed, open, half-open

    can call[T](func: callable[[], T]) -> T {
        if self.state == "open" {
            if time.now() - self.last_failure_time > self.recovery_timeout {
                self.state = "half-open";
            } else {
                raise CircuitOpenError("Circuit breaker is open");
            }
        }

        try {
            result = func();

            if self.state == "half-open" {
                # Success in half-open state, close circuit
                self.state = "closed";
                self.failure_count = 0;
            }

            return result;

        } except Exception as e {
            self.failure_count += 1;
            self.last_failure_time = time.now();

            if self.failure_count >= self.failure_threshold {
                self.state = "open";
                print(f"Circuit breaker opened after {self.failure_count} failures");
            }

            raise e;
        }
    }
}

// Using circuit breaker in distributed operations
walker ResilientDistributedWalker {
    has circuit_breakers: dict[str, CircuitBreaker] = {};

    can get_breaker(machine_id: str) -> CircuitBreaker {
        if machine_id not in self.circuit_breakers {
            self.circuit_breakers[machine_id] = CircuitBreaker();
        }
        return self.circuit_breakers[machine_id];
    }

    can visit_remote with entry {
        for remote_node in [-->:RemoteEdge:-->] {
            machine_id = remote_node.__machine_id__;
            breaker = self.get_breaker(machine_id);

            try {
                result = breaker.call(lambda: visit_and_process(remote_node));
                handle_result(result);

            } except CircuitOpenError {
                print(f"Skipping {machine_id} - circuit open");
                continue;

            } except Exception as e {
                print(f"Error processing remote node: {e}");
                continue;
            }
        }
    }
}

Error Recovery Patterns#

// Compensation pattern for distributed transactions
walker CompensatingTransaction {
    has operations: list[dict] = [];
    has compensations: list[callable] = [];

    can execute_with_compensation(
        operation: callable,
        compensation: callable
    ) -> any {
        try {
            result = operation();

            # Record successful operation and its compensation
            self.operations.append({
                "operation": operation.__name__,
                "result": result,
                "timestamp": now()
            });
            self.compensations.append(compensation);

            return result;

        } except Exception as e {
            # Operation failed, run compensations in reverse order
            print(f"Operation failed: {e}, running compensations...");

            for comp in reversed(self.compensations) {
                try {
                    comp();
                } except Exception as comp_error {
                    print(f"Compensation failed: {comp_error}");
                }
            }

            raise TransactionFailedError(
                f"Transaction rolled back due to: {e}"
            );
        }
    }
}

// Saga pattern for long-running transactions
walker SagaOrchestrator {
    has saga_id: str;
    has steps: list[SagaStep];
    has completed_steps: list[str] = [];

    can execute_saga with entry {
        for step in self.steps {
            try {
                # Execute step
                result = spawn step.walker on step.target_node;

                self.completed_steps.append(step.id);

                # Persist saga state
                persist_saga_state(self.saga_id, self.completed_steps);

            } except SagaStepError as e {
                # Step failed, initiate compensation
                print(f"Saga step {step.id} failed: {e}");

                spawn CompensatingSaga(
                    saga_id=self.saga_id,
                    failed_step=step.id,
                    completed_steps=self.completed_steps
                ) on root;

                disengage;
            }
        }

        report {
            "saga_id": self.saga_id,
            "status": "completed",
            "steps": self.completed_steps
        };
    }
}

// Bulkhead pattern for isolation
node ResourcePool {
    has name: str;
    has max_concurrent: int;
    has active_count: int = 0;
    has queue: list[walker] = [];

    can acquire_resource with visitor entry {
        :synchronized: {
            if self.active_count < self.max_concurrent {
                self.active_count += 1;
                visitor.resource_acquired = True;
            } else {
                # Add to queue
                self.queue.append(visitor);
                visitor.suspend();
            }
        }
    }

    can release_resource with visitor exit {
        :synchronized: {
            self.active_count -= 1;

            # Process queued walkers
            if self.queue {
                next_walker = self.queue.pop(0);
                self.active_count += 1;
                next_walker.resource_acquired = True;
                next_walker.resume();
            }
        }
    }
}

Summary#

This chapter covered Jac's advanced features that enable production-ready applications:

Concurrent Programming#

  • Parallel Walkers: Natural concurrency through spawn
  • Async/Await: Full support for asynchronous operations
  • Synchronization: Thread-safe graph operations and coordination
  • Patterns: Pipeline, map-reduce, and barrier synchronization

Type System#

  • Inference: Smart type inference reduces boilerplate
  • Generics: Powerful generic programming with constraints
  • Graph Types: Type-safe topological operations
  • Advanced Features: Union types, protocols, and variadic generics

Error Handling#

  • Traversal Errors: Graceful handling of graph navigation failures
  • Distributed Errors: Cross-machine error propagation
  • Recovery Patterns: Compensation, sagas, and circuit breakers
  • Resilience: Building fault-tolerant distributed systems

These advanced features, combined with Jac's scale-agnostic programming model, provide all the tools needed to build sophisticated, production-ready applications that can scale from single-user prototypes to global distributed systems.

In the next chapter, we'll explore design patterns specific to Jac that leverage these advanced features to solve common architectural challenges.