Exporting Workers in Package APIs

Kinode packages can export workers and expose them in easy-to-use ways. Exporting and importing functions is discussed in the previous recipe. This recipe focuses on:

  1. A simple example of exporting a worker and exposing it in an ergonmoic API.
  2. A simple example of importing a worker.
  3. Demonstrations of kit tooling for the above.

Exporting a Worker

Exporting or importing a worker is much the same as exporting or importing an API as usual as discussed in the previous recipe. The main difference, in general, is that the exporter must include the worker when kit building — see below. In the specific example here, the exporter also exports a function that makes use of the worker ergonomic: that function, start_download(), spawn()s the worker. In addition, in this specific example, the importer handles the message types of the worker.

Example: File Transfer

WIT API

#![allow(unused)]
fn main() {
...
interface file-transfer-worker {
    use standard.{address};

    /// external-facing requests
    variant request {
        /// download starts a download.
        /// * used by requestor to start whole process
        /// * used by provider to spin up worker to serve request
        download(download-request),
        /// progress is from worker to parent
        /// * acks not required, but provided for completeness
        progress(progress-request),
    }

    variant response {
        download(result<_, string>),
        /// ack: not required, but provided for completeness
        progress,
    }

    /// requests used between workers to transfer the file
    /// parent will not receive these, so need not handle them
    variant internal-request {
        chunk(chunk-request),
        size(u64),
    }

    record download-request {
        name: string,
        target: address,
        is-requestor: bool,
    }

    record progress-request {
        name: string,
        progress: u64,
    }

    record chunk-request {
        name: string,
        offset: u64,
        length: u64,
    }

    /// easiest way to use file-transfer-worker
    /// handle file-transfer-worker::request by calling this helper function
    start-download: func(
        our: address,
        source: address,
        name: string,
        target: address,
        is-requestor: bool,
    ) -> result<_, string>;
}

world file-transfer-worker-api-v0 {
    export file-transfer-worker;
}
...
}

API Function Definitions

The API here spawn()s a worker, and so the worker is part of the API.

API
#![allow(unused)]
fn main() {
use crate::exports::kinode::process::file_transfer_worker::{
    DownloadRequest, Guest, Request as WorkerRequest, Response as WorkerResponse,
};
use crate::kinode::process::standard::Address as WitAddress;
use kinode_process_lib::{our_capabilities, spawn, Address, OnExit, Request, Response};

wit_bindgen::generate!({
    path: "target/wit",
    world: "file-transfer-worker-api-v0",
    generate_unused_types: true,
    additional_derives: [serde::Deserialize, serde::Serialize, process_macros::SerdeJsonInto],
});

fn start_download(
    our: &WitAddress,
    source: &WitAddress,
    name: &str,
    target: &WitAddress,
    is_requestor: bool,
) -> anyhow::Result<()> {
    // spin up a worker, initialize based on whether it's a downloader or a sender.
    let our_worker = spawn(
        None,
        &format!(
            "{}:{}/pkg/file_transfer_worker.wasm",
            our.process.package_name, our.process.publisher_node,
        ),
        OnExit::None,
        our_capabilities(),
        vec![],
        false,
    )?;

    let target = if is_requestor { target } else { source };
    let our_worker_address = Address {
        node: our.node.clone(),
        process: our_worker,
    };

    Response::new()
        .body(WorkerResponse::Download(Ok(())))
        .send()?;

    Request::new()
        .expects_response(5)
        .body(WorkerRequest::Download(DownloadRequest {
            name: name.to_string(),
            target: target.clone(),
            is_requestor,
        }))
        .target(&our_worker_address)
        .send()?;

    Ok(())
}

struct Api;
impl Guest for Api {
    fn start_download(
        our: WitAddress,
        source: WitAddress,
        name: String,
        target: WitAddress,
        is_requestor: bool,
    ) -> Result<(), String> {
        match start_download(&our, &source, &name, &target, is_requestor) {
            Ok(result) => Ok(result),
            Err(e) => Err(format!("{e:?}")),
        }
    }
}
export!(Api);
}
Worker
#![allow(unused)]
fn main() {
use crate::kinode::process::file_transfer_worker::{
    ChunkRequest, DownloadRequest, InternalRequest, ProgressRequest, Request as WorkerRequest,
    Response as WorkerResponse,
};
use crate::kinode::process::standard::{Address as WitAddress, ProcessId as WitProcessId};
use kinode_process_lib::{
    await_message, call_init, get_blob, println,
    vfs::{open_dir, open_file, Directory, File, SeekFrom},
    Address, Message, ProcessId, Request, Response,
};

wit_bindgen::generate!({
    path: "target/wit",
    world: "{package_name_kebab}-{publisher_dotted_kebab}-v0",
    generate_unused_types: true,
    additional_derives: [serde::Deserialize, serde::Serialize, process_macros::SerdeJsonInto],
});

#[derive(Debug, serde::Deserialize, serde::Serialize, process_macros::SerdeJsonInto)]
#[serde(untagged)] // untagged as a meta-type for all incoming messages
enum Msg {
    // requests
    WorkerRequest(WorkerRequest),
    InternalRequest(InternalRequest),

    // responses
    WorkerResponse(WorkerResponse),
}

impl From<WitAddress> for Address {
    fn from(address: WitAddress) -> Self {
        Address {
            node: address.node,
            process: address.process.into(),
        }
    }
}

impl From<WitProcessId> for ProcessId {
    fn from(process: WitProcessId) -> Self {
        ProcessId {
            process_name: process.process_name,
            package_name: process.package_name,
            publisher_node: process.publisher_node,
        }
    }
}

const CHUNK_SIZE: u64 = 1048576; // 1MB

fn handle_worker_request(
    request: &WorkerRequest,
    file: &mut Option<File>,
    files_dir: &Directory,
) -> anyhow::Result<bool> {
    match request {
        WorkerRequest::Download(DownloadRequest {
            name,
            target,
            is_requestor,
        }) => {
            Response::new()
                .body(WorkerResponse::Download(Ok(())))
                .send()?;

            // open/create empty file in both cases.
            let mut active_file = open_file(&format!("{}/{}", files_dir.path, &name), true, None)?;

            if *is_requestor {
                *file = Some(active_file);
                Request::new()
                    .expects_response(5)
                    .body(WorkerRequest::Download(DownloadRequest {
                        name: name.to_string(),
                        target: target.clone(),
                        is_requestor: false,
                    }))
                    .target::<Address>(target.clone().into())
                    .send()?;
            } else {
                // we are sender: chunk the data, and send it.
                let size = active_file.metadata()?.len;
                let num_chunks = (size as f64 / CHUNK_SIZE as f64).ceil() as u64;

                // give receiving worker file size so it can track download progress
                Request::new()
                    .body(InternalRequest::Size(size))
                    .target(target.clone())
                    .send()?;

                active_file.seek(SeekFrom::Start(0))?;

                for i in 0..num_chunks {
                    let offset = i * CHUNK_SIZE;
                    let length = CHUNK_SIZE.min(size - offset);

                    let mut buffer = vec![0; length as usize];
                    active_file.read_at(&mut buffer)?;

                    Request::new()
                        .body(InternalRequest::Chunk(ChunkRequest {
                            name: name.clone(),
                            offset,
                            length,
                        }))
                        .target(target.clone())
                        .blob_bytes(buffer)
                        .send()?;
                }
                return Ok(true);
            }
        }
        WorkerRequest::Progress(_) => {
            return Err(anyhow::anyhow!(
                "worker: got unexpected WorkerRequest::Progress",
            ));
        }
    }
    Ok(false)
}

fn handle_internal_request(
    request: &InternalRequest,
    file: &mut Option<File>,
    size: &mut Option<u64>,
    parent: &Option<Address>,
) -> anyhow::Result<bool> {
    match request {
        InternalRequest::Chunk(ChunkRequest {
            name,
            offset,
            length,
        }) => {
            // someone sending a chunk to us
            let file = match file {
                Some(file) => file,
                None => {
                    return Err(anyhow::anyhow!(
                        "worker: receive error: no file initialized"
                    ));
                }
            };

            let bytes = match get_blob() {
                Some(blob) => blob.bytes,
                None => {
                    return Err(anyhow::anyhow!("worker: receive error: no blob"));
                }
            };

            file.write_all(&bytes)?;

            // if sender has sent us a size, give a progress update to main transfer
            let Some(ref parent) = parent else {
                return Ok(false);
            };
            if let Some(size) = size {
                let progress = ((offset + length) as f64 / *size as f64 * 100.0) as u64;

                Request::new()
                    .expects_response(5)
                    .body(WorkerRequest::Progress(ProgressRequest {
                        name: name.to_string(),
                        progress,
                    }))
                    .target(parent)
                    .send()?;

                if progress >= 100 {
                    return Ok(true);
                }
            }
        }
        InternalRequest::Size(incoming_size) => {
            *size = Some(*incoming_size);
        }
    }
    Ok(false)
}

fn handle_worker_response(response: &WorkerResponse) -> anyhow::Result<bool> {
    match response {
        WorkerResponse::Download(ref result) => {
            if let Err(e) = result {
                return Err(anyhow::anyhow!("{e}"));
            }
        }
        WorkerResponse::Progress => {}
    }
    Ok(false)
}

fn handle_message(
    our: &Address,
    message: &Message,
    file: &mut Option<File>,
    files_dir: &Directory,
    size: &mut Option<u64>,
    parent: &mut Option<Address>,
) -> anyhow::Result<bool> {
    return Ok(match message.body().try_into()? {
        // requests
        Msg::WorkerRequest(ref wr) => {
            *parent = Some(message.source().clone());
            handle_worker_request(wr, file, files_dir)?
        }
        Msg::InternalRequest(ref ir) => handle_internal_request(ir, file, size, parent)?,

        // responses
        Msg::WorkerResponse(ref wr) => handle_worker_response(wr)?,
    });
}

call_init!(init);
fn init(our: Address) {
    println!("worker: begin");
    let start = std::time::Instant::now();

    let drive_path = format!("{}/files", our.package_id());
    let files_dir = open_dir(&drive_path, false, None).unwrap();

    let mut file: Option<File> = None;
    let mut size: Option<u64> = None;
    let mut parent: Option<Address> = None;

    loop {
        match await_message() {
            Err(send_error) => println!("worker: got SendError: {send_error}"),
            Ok(ref message) => {
                match handle_message(&our, message, &mut file, &files_dir, &mut size, &mut parent) {
                    Ok(exit) => {
                        if exit {
                            println!("worker: done: exiting, took {:?}", start.elapsed());
                            break;
                        }
                    }
                    Err(e) => println!("worker: got error while handling message: {e:?}"),
                }
            }
        }
    }
}
}

Process

The file_transfer process imports and uses the exported start_download():

#![allow(unused)]
fn main() {
use crate::kinode::process::standard::{Address as WitAddress, ProcessId as WitProcessId};
use crate::kinode::process::file_transfer_worker::{start_download, Request as WorkerRequest, Response as WorkerResponse, DownloadRequest, ProgressRequest};
use crate::kinode::process::{package_name}::{Request as TransferRequest, Response as TransferResponse, FileInfo};
use kinode_process_lib::{
    await_message, call_init, println,
    vfs::{create_drive, metadata, open_dir, Directory, FileType},
    Address, Message, ProcessId, Response,
};

wit_bindgen::generate!({
    path: "target/wit",
    world: "{package_name_kebab}-{publisher_dotted_kebab}-v0",
    generate_unused_types: true,
    additional_derives: [serde::Deserialize, serde::Serialize, process_macros::SerdeJsonInto],
});

#[derive(Debug, serde::Deserialize, serde::Serialize, process_macros::SerdeJsonInto)]
#[serde(untagged)] // untagged as a meta-type for all incoming messages
enum Msg {
    // requests
    TransferRequest(TransferRequest),
    WorkerRequest(WorkerRequest),

    // responses
    TransferResponse(TransferResponse),
    WorkerResponse(WorkerResponse),
}

impl From<Address> for WitAddress {
    fn from(address: Address) -> Self {
        WitAddress {
            node: address.node,
            process: address.process.into(),
        }
    }
}

impl From<ProcessId> for WitProcessId {
    fn from(process: ProcessId) -> Self {
        WitProcessId {
            process_name: process.process_name,
            package_name: process.package_name,
            publisher_node: process.publisher_node,
        }
    }
}

fn ls_files(files_dir: &Directory) -> anyhow::Result<Vec<FileInfo>> {
    let entries = files_dir.read()?;
    let files: Vec<FileInfo> = entries
        .iter()
        .filter_map(|file| match file.file_type {
            FileType::File => match metadata(&file.path, None) {
                Ok(metadata) => Some(FileInfo {
                    name: file.path.clone(),
                    size: metadata.len,
                }),
                Err(_) => None,
            },
            _ => None,
        })
        .collect();
    Ok(files)
}

fn handle_transfer_request(
    request: &TransferRequest,
    files_dir: &Directory,
) -> anyhow::Result<()> {
    match request {
        TransferRequest::ListFiles => {
            let files = ls_files(files_dir)?;
            Response::new()
                .body(TransferResponse::ListFiles(files))
                .send()?;
        }
    }
    Ok(())
}

fn handle_worker_request(
    our: &Address,
    source: &Address,
    request: &WorkerRequest,
) -> anyhow::Result<()> {
    match request {
        WorkerRequest::Download(DownloadRequest { ref name, ref target, is_requestor }) => {
            match start_download(
                &our.clone().into(),
                &source.clone().into(),
                name,
                target,
                *is_requestor,
            ) {
                Ok(_) => {}
                Err(e) => return Err(anyhow::anyhow!("{e}")),
            }
        }
        WorkerRequest::Progress(ProgressRequest { name, progress }) => {
            println!("{} progress: {}%", name, progress);
            Response::new()
                .body(WorkerResponse::Progress)
                .send()?;
        }
    }
    Ok(())
}

fn handle_transfer_response(source: &Address, response: &TransferResponse) -> anyhow::Result<()> {
    match response {
        TransferResponse::ListFiles(ref files) => {
            println!(
                "{}",
                files.iter().
                    fold(format!("{source} available files:\nFile\t\tSize (bytes)\n"), |mut msg, file| {
                        msg.push_str(&format!(
                            "{}\t\t{}", file.name.split('/').last().unwrap(),
                            file.size,
                        ));
                        msg
                    })
            );
        }
    }
    Ok(())
}

fn handle_worker_response(response: &WorkerResponse) -> anyhow::Result<()> {
    match response {
        WorkerResponse::Download(ref result) => {
            if let Err(e) = result {
                return Err(anyhow::anyhow!("{e}"))
            }
        }
        WorkerResponse::Progress => {}
    }
    Ok(())
}

fn handle_message(
    our: &Address,
    message: &Message,
    files_dir: &Directory,
) -> anyhow::Result<()> {
    match message.body().try_into()? {
        // requests
        Msg::TransferRequest(ref tr) => handle_transfer_request(tr, files_dir),
        Msg::WorkerRequest(ref wr) => handle_worker_request(our, message.source(), wr),

        // responses
        Msg::TransferResponse(ref tr) => handle_transfer_response(message.source(), tr),
        Msg::WorkerResponse(ref wr) => handle_worker_response(wr),
    }
}

call_init!(init);
fn init(our: Address) {
    println!("begin");

    let drive_path = create_drive(our.package_id(), "files", None).unwrap();
    let files_dir = open_dir(&drive_path, false, None).unwrap();

    loop {
        match await_message() {
            Err(send_error) => println!("got SendError: {send_error}"),
            Ok(ref message) => match handle_message(&our, message, &files_dir) {
                Ok(_) => {}
                Err(e) => println!("got error while handling message: {e:?}"),
            }
        }
    }
}
}

Importing an API

Dependencies

metadata.json

The metadata.json file has a properties.dependencies field. When the dependencies field is populated, kit build will fetch that dependency from a Kinode hosting it.

See previous recipe for more discussion of dependencies.

Example: Chat with File Transfer

The example here is the kit n chat chat template with the small addition of file transfer functionality. The addition of file transfer requires changes to the WIT API (to import the file-transfer-worker interface, e.g.) as well as to the process itself to make use of the imported types and functions. Compare the process with the unmodified kit n chat process.

WIT API

#![allow(unused)]
fn main() {
interface chat-with-file-transfer {
    variant request {
        send(send-request),
        /// history of chat with given node
        history(string),
    }

    variant response {
        send,
        history(list<chat-message>),
    }

    record send-request {
        target: string,
        message: string,
    }

    record chat-message {
        author: string,
        content: string,
    }
}

world chat-with-file-transfer-template-dot-os-v0 {
    import chat-with-file-transfer;
    import file-transfer-worker;
    include process-v1;
}
}

Process

#![allow(unused)]
fn main() {
use std::collections::HashMap;

use crate::kinode::process::chat_with_file_transfer::{
    ChatMessage, Request as ChatRequest, Response as ChatResponse, SendRequest,
};
use crate::kinode::process::file_transfer_worker::{
    start_download, DownloadRequest, ProgressRequest, Request as WorkerRequest,
    Response as WorkerResponse,
};
use crate::kinode::process::standard::{Address as WitAddress, ProcessId as WitProcessId};
use kinode_process_lib::{
    await_message, call_init, get_capability, println,
    vfs::{create_drive, open_file},
    Address, Message, ProcessId, Request, Response,
};

wit_bindgen::generate!({
    path: "target/wit",
    world: "chat-with-file-transfer-template-dot-os-v0",
    generate_unused_types: true,
    additional_derives: [serde::Deserialize, serde::Serialize, process_macros::SerdeJsonInto],
});

#[derive(Debug, serde::Deserialize, serde::Serialize, process_macros::SerdeJsonInto)]
#[serde(untagged)] // untagged as a meta-type for all incoming messages
enum Msg {
    // requests
    ChatRequest(ChatRequest),
    WorkerRequest(WorkerRequest),

    // responses
    WorkerResponse(WorkerResponse),
}

impl From<Address> for WitAddress {
    fn from(address: Address) -> Self {
        WitAddress {
            node: address.node,
            process: address.process.into(),
        }
    }
}

impl From<ProcessId> for WitProcessId {
    fn from(process: ProcessId) -> Self {
        WitProcessId {
            process_name: process.process_name,
            package_name: process.package_name,
            publisher_node: process.publisher_node,
        }
    }
}

type MessageArchive = HashMap<String, Vec<ChatMessage>>;

fn handle_chat_request(
    our: &Address,
    source: &Address,
    request: &ChatRequest,
    message_archive: &mut MessageArchive,
) -> anyhow::Result<()> {
    match request {
        ChatRequest::Send(SendRequest {
            ref target,
            ref message,
        }) => {
            if target == &our.node {
                println!("{}: {}", source.node, message);
                let message = ChatMessage {
                    author: source.node.clone(),
                    content: message.into(),
                };
                message_archive
                    .entry(source.node.clone())
                    .and_modify(|e| e.push(message.clone()))
                    .or_insert(vec![message]);
            } else {
                let _ = Request::new()
                    .target(Address {
                        node: target.clone(),
                        process: "chat-with-file-transfer:chat-with-file-transfer:template.os"
                            .parse()?,
                    })
                    .body(request)
                    .send_and_await_response(5)?
                    .unwrap();
                let message = ChatMessage {
                    author: our.node.clone(),
                    content: message.into(),
                };
                message_archive
                    .entry(target.clone())
                    .and_modify(|e| e.push(message.clone()))
                    .or_insert(vec![message]);
            }
            Response::new().body(ChatResponse::Send).send().unwrap();
        }
        ChatRequest::History(ref node) => {
            Response::new()
                .body(ChatResponse::History(
                    message_archive
                        .get(node)
                        .map(|msgs| msgs.clone())
                        .unwrap_or_default(),
                ))
                .send()
                .unwrap();
        }
    }
    Ok(())
}

fn handle_worker_request(
    our: &Address,
    source: &Address,
    request: &WorkerRequest,
) -> anyhow::Result<()> {
    match request {
        WorkerRequest::Download(DownloadRequest {
            ref name,
            ref target,
            is_requestor,
        }) => {
            match start_download(
                &our.clone().into(),
                &source.clone().into(),
                name,
                target,
                *is_requestor,
            ) {
                Ok(_) => {}
                Err(e) => return Err(anyhow::anyhow!("{e}")),
            }
        }
        WorkerRequest::Progress(ProgressRequest { name, progress }) => {
            println!("{} progress: {}%", name, progress);
            Response::new().body(WorkerResponse::Progress).send()?;
        }
    }
    Ok(())
}

fn handle_worker_response(response: &WorkerResponse) -> anyhow::Result<()> {
    match response {
        WorkerResponse::Download(ref result) => {
            if let Err(e) = result {
                return Err(anyhow::anyhow!("{e}"));
            }
        }
        WorkerResponse::Progress => {}
    }
    Ok(())
}

fn handle_message(
    our: &Address,
    message: &Message,
    message_archive: &mut MessageArchive,
) -> anyhow::Result<()> {
    match message.body().try_into()? {
        // requests
        Msg::ChatRequest(ref cr) => handle_chat_request(our, message.source(), cr, message_archive),
        Msg::WorkerRequest(ref wr) => handle_worker_request(our, message.source(), wr),

        // responses
        Msg::WorkerResponse(ref wr) => handle_worker_response(wr),
    }
}

#[cfg(feature = "test")]
#[derive(Debug, serde::Deserialize, serde::Serialize, process_macros::SerdeJsonInto)]
#[cfg(feature = "test")]
enum Setup {
    Caps,
    WriteFile { name: String, contents: Vec<u8> },
}

#[cfg(feature = "test")]
fn handle_tester_setup(our: &Address, drive_path: &str) -> anyhow::Result<()> {
    println!("awaiting setup...");

    let Ok(message) = await_message() else {
        return Err(anyhow::anyhow!("a"));
    };
    // TODO: confirm its from tester
    match message.body().try_into()? {
        Setup::Caps => {
            println!("got caps...");
            let vfs_read_cap = serde_json::json!({
                "kind": "read",
                "drive": drive_path,
            })
            .to_string();
            let vfs_address = Address {
                node: our.node.clone(),
                process: "vfs:distro:sys".parse()?,
            };

            let read_cap = get_capability(&vfs_address, &vfs_read_cap).unwrap();

            Response::new()
                .body(vec![])
                .capabilities(vec![read_cap])
                .send()
                .unwrap();
            println!("sent caps");
        }
        Setup::WriteFile {
            ref name,
            ref contents,
        } => {
            println!("got write file...");
            let file = open_file(&format!("{drive_path}/{name}"), true, None)?;
            file.write(contents)?;
        }
    }
    println!("setup done");
    Ok(())
}

call_init!(init);
fn init(our: Address) {
    println!("begin");

    let drive_path = create_drive(our.package_id(), "files", None).unwrap();
    let mut message_archive = HashMap::new();

    #[cfg(feature = "test")]
    handle_tester_setup(&our, &drive_path).unwrap();

    loop {
        match await_message() {
            Err(send_error) => println!("got SendError: {send_error}"),
            Ok(ref message) => match handle_message(&our, message, &mut message_archive) {
                Ok(_) => {}
                Err(e) => println!("got error while handling message: {e:?}"),
            },
        }
    }
}
}

Chat with File Transfer Usage Example

Build

# Start fake nodes.
kit f
kit f -o /tmp/kinode-fake-node-2 -p 8081 -f fake2.dev

# Create & build file_transfer dependency.
## The `-a` adds the worker Wasm file to the API so it can be exported properly.
kit n file_transfer -t file_transfer
kit b file_transfer -a file_transfer/pkg/file_transfer_worker.wasm

# Build chat_with_file_transfer.
## The `-l` satisfies the dependency using a local path.
kit b src/../code/chat-with-file-transfer -l file-transfer

# Start chat_with_file_transfer on fake nodes.
kit s src/../code/chat-with-file-transfer
kit s src/../code/chat-with-file-transfer -p 8081

Usage

# First, put a file into `/tmp/kinode-fake-node-2/vfs/chat-with-file-transfer:template.os/files/`, e.g.:
echo 'hello world' > /tmp/kinode-fake-node-2/vfs/chat-with-file-transfer:template.os/files/my_file.txt

# In fake.dev terminal, download the file.
download:chat-with-file-transfer:template.os my_file.txt fake2.dev

# Confirm file was downloaded:
cat /tmp/kinode-fake-node/vfs/chat-with-file-transfer:template.os/files/my_file.txt
Get Help: