Async Walker#
Overview#
Async walkers in Jac Cloud allow you to execute walkers asynchronously in separate threads. This is particularly useful for long-running operations that shouldn't block the main execution flow, such as intensive computations or external API calls.
Basic Usage#
To create an async walker, simply add the async
keyword before the walker declaration:
async walker sample {
has value: int = 0;
can enter with `root entry {
print("test");
self.value = 1;
}
}
Key characteristics: - Executes in a separate thread without blocking the main application - Returns immediately with a reference ID while continuing execution in the background - Similar to task scheduling but with a simpler syntax - Results can be retrieved later using the walker ID
How It Works#
- When an async walker is triggered, it's scheduled as a background task
- The API call returns immediately with a walker ID reference
- The walker executes asynchronously in its own thread
- Results are stored in the database and can be retrieved later
Response Format#
When you call an async walker, you receive a response containing the walker's unique ID:
Retrieving Results#
You can retrieve the results of an async walker by using its ID:
walker view_sample_result {
has walker_id: str;
can enter with `root entry {
// Get a reference to the walker instance
wlk = &walker_id;
// Access the walker's attributes
print(wlk.value); // Will be 1 after execution completes
// Check execution status and metadata
schedule_info = wlk.__jac__.schedule;
// Print execution details
print(f"Status: {schedule_info.status}");
print(f"Executed at: {schedule_info.executed_date}");
// Check for errors
if schedule_info.error:
print(f"Error: {schedule_info.error}");
}
}
Available Status Information#
The __jac__.schedule
object contains all execution metadata:
Field | Description |
---|---|
status |
Current execution status (pending, running, completed, failed) |
node_id |
ID of the node where the walker was executed |
root_id |
ID of the root node of the user who triggered the walker |
execute_date |
When the walker was scheduled to execute |
executed_date |
When the walker actually executed |
http_status |
HTTP status code for the execution result |
reports |
Any values reported during walker execution |
custom |
Custom metadata associated with the walker |
error |
Error message if execution failed |
Example: Long-Running Process#
async walker process_large_dataset {
has dataset_id: str;
has results: list = [];
can enter with `root entry {
// Simulate long-running process
dataset = get_dataset(self.dataset_id);
for item in dataset:
// Do intensive processing
processed = complex_computation(item);
self.results.append(processed);
// Final result is stored in the walker
print("Processing complete!");
}
}
// Retrieve results when needed
walker check_processing {
has process_id: str;
can enter with `root entry {
process = &process_id;
if process.__jac__.schedule.status == "completed":
print("Results:", process.results);
else:
print("Still processing...");
}
}
Common Use Cases#
1. Processing Large Datasets#
async walker analyze_data {
has dataset_id: str;
has summary: dict = {};
can enter with `root entry {
// Fetch data (could take minutes)
data = fetch_dataset(self.dataset_id);
// Process each item (CPU intensive)
for item in data:
process_item(item);
// Generate final summary
self.summary = create_summary(data);
}
}
2. Generating Reports#
async walker generate_report {
has user_id: str;
has report_type: str;
has report_url: str;
can enter with `root entry {
// Collect user data
user_data = fetch_user_data(self.user_id);
// Generate PDF (slow operation)
report_file = create_pdf_report(user_data, self.report_type);
// Upload to storage
self.report_url = upload_file(report_file);
// Optional: Notify user
send_email(self.user_id, "Your report is ready!", self.report_url);
}
}
3. External API Integration#
async walker sync_with_external_system {
has account_id: str;
has sync_status: str = "pending";
has sync_results: list = [];
can enter with `root entry {
// Connect to external API
self.sync_status = "connecting";
api_client = connect_to_api();
// Fetch data (network-bound, can be slow)
self.sync_status = "fetching";
external_data = api_client.fetch_account_data(self.account_id);
// Process and save data
self.sync_status = "processing";
for item in external_data:
result = process_and_save(item);
self.sync_results.append(result);
self.sync_status = "complete";
}
}
Advanced Techniques#
Combining with WebSockets for Real-time Updates#
import from jac_cloud.plugin {WEBSOCKET_MANAGER as socket}
async walker process_with_updates {
has client_id: str;
has progress: int = 0;
can enter with `root entry {
// Send initial notification
socket.notify_clients([self.client_id], {
"type": "progress",
"data": {"progress": 0}
});
// Process in chunks and send updates
for i in range(10):
process_chunk(i);
self.progress = (i+1) * 10;
// Send progress update via WebSocket
socket.notify_clients([self.client_id], {
"type": "progress",
"data": {"progress": self.progress}
});
// Send completion notification
socket.notify_clients([self.client_id], {
"type": "complete",
"data": {"message": "Processing complete!"}
});
}
}
Error Handling#
async walker safe_process {
has input_id: str;
has success: bool = false;
has error_message: str = "";
has results: dict = {};
can enter with `root entry {
try {
// Attempt processing
data = fetch_data(self.input_id);
if not data:
self.error_message = "No data found";
return;
self.results = process_data(data);
self.success = true;
} except e {
// Capture error details
self.error_message = str(e);
log_error(self.input_id, str(e));
}
}
}
Next Steps#
- Learn about Task Scheduling for recurring tasks
- Explore WebSockets for real-time communication
- Set up Logging to monitor your async processes