Talking to the Outside World

Kinode communicates with the Kinode network using the Kinode Networking Protocol. But nodes must also be able to communicate with the outside world. These recipes will walk through a variety of communication methods. Briefly, Kinode can speak both HTTP and WebSockets, and can operate as a client or a server for both. You can find the APIs for HTTP client and server, as well as for WebSockets elsewhere. This document focuses on simple usage examples of each.

HTTP

HTTP Client

#![allow(unused)]
fn main() {
/// Simple example of sending an HTTP request.
/// Usage:
/// ```
/// # Start node.
/// kit f
///
/// # Start package from a new terminal.
/// kit bs http_client
/// ```
use kinode_process_lib::{call_init, http, println, Address};

wit_bindgen::generate!({
    path: "target/wit",
    world: "process-v0",
});

const URL: &str = "https://raw.githubusercontent.com/kinode-dao/kinode-wit/master/kinode.wit";

call_init!(init);
fn init(_our: Address) {
    println!("begin");

    let url = url::Url::parse(URL).expect("failed to parse url");
    let response = http::send_request_await_response(http::Method::GET, url, None, 5, vec![]);

    match response {
        Err(e) => panic!("request failed: {e:?}"),
        Ok(r) => {
            let r = String::from_utf8(r.body().clone()).expect("couldn't read response");
            println!("{r}");
        }
    }
}
}

Full example package.

HTTP Server

#![allow(unused)]
fn main() {
/// Simple example of running an HTTP server.
/// Usage:
/// ```
/// # Start node.
/// kit f
///
/// # Start package from a new terminal.
/// kit bs http_server
///
/// # Send an HTTP request.
/// curl -X PUT -d '{"Hello": "greetings"}' http://localhost:8080/http_server:http_server:template.os
/// ```
use anyhow::{anyhow, Result};

use kinode_process_lib::{await_message, call_init, get_blob, http, println, Address, Message};

wit_bindgen::generate!({
    path: "target/wit",
    world: "process-v0",
});

/// Handle a message from the HTTP server.
fn handle_http_message(message: &Message) -> Result<()> {
    let Ok(server_request) = http::HttpServerRequest::from_bytes(message.body()) else {
        return Err(anyhow!("received a message with weird `body`!"));
    };
    let Some(http_request) = server_request.request() else {
        return Err(anyhow!("received a WebSocket message, skipping"));
    };
    if http_request.method().unwrap() != http::Method::PUT {
        return Err(anyhow!("received a non-PUT HTTP request, skipping"));
    }
    let Some(body) = get_blob() else {
        return Err(anyhow!(
            "received a PUT HTTP request with no body, skipping"
        ));
    };
    http::send_response(http::StatusCode::OK, None, vec![]);
    println!(
        "{:?}",
        serde_json::from_slice::<serde_json::Value>(&body.bytes)
    );
    Ok(())
}

call_init!(init);
fn init(_our: Address) {
    println!("begin");

    http::bind_http_path("/", false, false).unwrap();

    loop {
        match await_message() {
            Ok(message) => {
                if message.source().process == "http_server:distro:sys" {
                    if let Err(e) = handle_http_message(&message) {
                        println!("{e}");
                    }
                }
            }
            Err(_send_error) => println!("got send error!"),
        }
    }
}
}

Full example package.

WebSockets

WebSockets Client

The Kinode process:

#![allow(unused)]
fn main() {
/// Simple example of using the WebSockets client.
/// Usage:
/// ```
/// # Start node.
/// kit f
///
/// # Start WS server from a new terminal.
/// ./ws_client/ws_server.py
///
/// # Start package from a new terminal.
/// kit bs ws_client
/// ```
use anyhow::{anyhow, Result};

use kinode_process_lib::{
    await_message, call_init, get_blob, http, println, Address, LazyLoadBlob, Message,
};
#[cfg(feature = "test")]
use kinode_process_lib::{OnExit, Request};

wit_bindgen::generate!({
    path: "target/wit",
    world: "process-v0",
});

const WS_URL: &str = "ws://localhost:8765";
const CONNECTION: u32 = 0;

fn handle_http_message(message: &Message, connection: &u32) -> Result<()> {
    match serde_json::from_slice::<http::client::HttpClientRequest>(message.body())? {
        http::client::HttpClientRequest::WebSocketClose { channel_id } => {
            assert_eq!(*connection, channel_id);
        }
        http::client::HttpClientRequest::WebSocketPush {
            channel_id,
            message_type,
        } => {
            assert_eq!(*connection, channel_id);
            if message_type == http::client::WsMessageType::Close {
                println!("got Close push");
                return Ok(());
            }

            assert_eq!(message_type, http::client::WsMessageType::Text);

            let Some(blob) = get_blob() else {
                return Err(anyhow!("got WebSocketPush with no blob"));
            };
            println!("Received from server: {:?}", String::from_utf8(blob.bytes));

            http::client::send_ws_client_push(
                connection.clone(),
                http::client::WsMessageType::Text,
                LazyLoadBlob {
                    mime: Some("application/json".to_string()),
                    bytes: serde_json::to_vec("Hello from client").unwrap(),
                },
            );
        }
    }
    Ok(())
}

fn talk_to_ws() -> Result<()> {
    let connection = CONNECTION;
    http::client::open_ws_connection(WS_URL.to_string(), None, connection)?;

    match await_message() {
        Ok(message) => {
            if message.source().process == "http_client:distro:sys" {
                if let Err(e) = handle_http_message(&message, &connection) {
                    println!("{e}");
                }
            }
        }
        Err(_send_error) => println!("got send error!"),
    }
    Ok(())
}

#[cfg(feature = "test")]
fn talk_to_ws_test() -> Result<()> {
    println!("in test");
    let message = await_message()?;
    let parent_address = message.source();
    println!("got parent {parent_address:?}");

    match talk_to_ws() {
        Ok(_) => {}
        Err(e) => println!("error talking to ws: {e}"),
    }

    Request::to(parent_address)
        .body(serde_json::to_vec(&Ok::<(), ()>(())).unwrap())
        .send()
        .unwrap();
    OnExit::None.set().unwrap();
    println!("done");

    Ok(())
}

call_init!(init);
fn init(our: Address) {
    println!("{}: begin", our.process());

    #[cfg(not(feature = "test"))]
    match talk_to_ws() {
        Ok(_) => {}
        Err(e) => println!("error talking to ws: {e}"),
    }

    #[cfg(feature = "test")]
    match talk_to_ws_test() {
        Ok(_) => {}
        Err(e) => println!("error talking to ws: {e}"),
    }
}
}

An example WS server:

#!/usr/bin/env python3

import asyncio
import websockets

async def ws_handler(websocket, path, shutdown_event):
    try:
        await websocket.send("ack client connection")

        response = await websocket.recv()
        print(f"Received response from client: {response}")
    finally:
        await websocket.close()
        shutdown_event.set()

async def main():
    shutdown_event = asyncio.Event()

    async with websockets.serve(lambda ws, path: ws_handler(ws, path, shutdown_event), "localhost", 8765):
        print("Server started at ws://localhost:8765")

        await shutdown_event.wait()

        print("Shutting down server.")

if __name__ == '__main__':
    asyncio.run(main())

Full example package & client.

WebSockets Server

The Kinode process:

#![allow(unused)]
fn main() {
/// Simple example of running a WebSockets server.
/// Usage:
/// ```
/// # Start node.
/// kit f
///
/// # Start package from a new terminal.
/// kit bs ws_server
///
/// # Connect from WS client script.
/// ./ws_server/ws_client.py
/// ```
use anyhow::{anyhow, Result};

use kinode_process_lib::{
    await_message, call_init, get_blob, http, println, Address, LazyLoadBlob, Message,
};

wit_bindgen::generate!({
    path: "target/wit",
    world: "process-v0",
});

const WS_PATH: &str = "/";

fn handle_http_message(
    _our: &Address,
    message: &Message,
    connection: &mut Option<u32>,
) -> Result<()> {
    match serde_json::from_slice::<http::HttpServerRequest>(message.body())? {
        http::HttpServerRequest::Http(_) => {
            return Err(anyhow!("unexpected HTTP request"));
        }
        http::HttpServerRequest::WebSocketOpen { path, channel_id } => {
            assert_eq!(path, WS_PATH);
            assert_eq!(*connection, None);

            *connection = Some(channel_id);

            http::send_ws_push(
                channel_id,
                http::WsMessageType::Text,
                LazyLoadBlob {
                    mime: Some("application/json".to_string()),
                    bytes: serde_json::to_vec("ack client connection").unwrap(),
                },
            );
        }
        http::HttpServerRequest::WebSocketClose(channel_id) => {
            assert_eq!(*connection, Some(channel_id));

            *connection = None;
        }
        http::HttpServerRequest::WebSocketPush {
            channel_id,
            message_type,
        } => {
            assert_eq!(*connection, Some(channel_id));
            if message_type == http::WsMessageType::Close {
                println!("got Close push");
                return Ok(());
            }

            assert_eq!(message_type, http::WsMessageType::Text);

            let Some(blob) = get_blob() else {
                return Err(anyhow!("got WebSocketPush with no blob"));
            };
            println!("got Text from WS: {:?}", String::from_utf8(blob.bytes));
        }
    }
    Ok(())
}

call_init!(init);
fn init(our: Address) {
    println!("begin");

    let mut connection: Option<u32> = None;
    http::bind_ws_path(WS_PATH, false, false).unwrap();

    loop {
        match await_message() {
            Ok(message) => {
                if message.source().process == "http_server:distro:sys" {
                    if let Err(e) = handle_http_message(&our, &message, &mut connection) {
                        println!("{e}");
                    }
                }
            }
            Err(_send_error) => println!("got send error!"),
        }
    }
}
}

An example WS client:

#!/usr/bin/env python3

import asyncio
import websockets

async def connect_websocket(
    uri="ws://localhost:8080/ws_server:ws_server:template.os",
    max_retries=5,
    delay_secs=0.5,
):
    attempt = 0
    while attempt < max_retries:
        try:
            return await websockets.connect(uri, ping_interval=None)
        except (
            websockets.ConnectionClosedError,
            websockets.InvalidURI,
            websockets.InvalidStatusCode,
        ) as e:
            attempt += 1
            await asyncio.sleep(delay_secs)

    raise Exception("Max retries exceeded, unable to connect.")

async def websocket_client():
    websocket = await connect_websocket()

    message = await websocket.recv()
    print(f"Received from server: {message}")

    response = "Hello from client"
    await websocket.send(response)
    print(f"Sent to server: {response}")

    websocket.close()

def main():
    asyncio.run(websocket_client())

if __name__ == "__main__":
    main()

Full example package & client.

WebSockets Server with Reply Type

One constraint of Kinode's default WebSockets server Push is that it breaks the Request/Response pairing. This is because the server cannot specify it expects a Response back: all Pushes are Requests.

Use the following pattern to allow the WebSocket client to reply with a Response:

The Kinode process:

#![allow(unused)]
fn main() {
/// Simple example of running a WebSockets server, specifying reply type as Response.
/// Usage:
/// ```
/// # Start node.
/// kit f
///
/// # Start package from a new terminal.
/// kit bs ws_server_with_reply
///
/// # Connect from WS client script.
/// ./ws_server/ws_client.py
/// ```
use anyhow::{anyhow, Result};

use kinode_process_lib::kernel_types::MessageType;
use kinode_process_lib::{
    await_message, call_init, get_blob, http, println, Address, LazyLoadBlob, Message, Request,
};

wit_bindgen::generate!({
    path: "target/wit",
    world: "process-v0",
});

const WS_PATH: &str = "/";

fn handle_http_message(
    _our: &Address,
    message: &Message,
    connection: &mut Option<u32>,
) -> Result<()> {
    match serde_json::from_slice::<http::HttpServerRequest>(message.body())? {
        http::HttpServerRequest::Http(_) => {
            return Err(anyhow!("unexpected HTTP request"));
        }
        http::HttpServerRequest::WebSocketOpen { path, channel_id } => {
            assert_eq!(path, WS_PATH);
            assert_eq!(*connection, None);

            *connection = Some(channel_id.clone());

            Request::to("our@http_server:distro:sys".parse::<Address>()?)
                .body(serde_json::to_vec(
                    &http::HttpServerAction::WebSocketExtPushOutgoing {
                        channel_id,
                        message_type: http::WsMessageType::Binary,
                        desired_reply_type: MessageType::Response,
                    },
                )?)
                .expects_response(15)
                .blob(LazyLoadBlob {
                    mime: Some("application/json".to_string()),
                    bytes: rmp_serde::to_vec_named("ack client connection").unwrap(),
                })
                .send()?;
        }
        http::HttpServerRequest::WebSocketClose(channel_id) => {
            assert_eq!(*connection, Some(channel_id));

            *connection = None;
        }
        http::HttpServerRequest::WebSocketPush {
            channel_id,
            message_type,
        } => {
            assert_eq!(*connection, Some(channel_id));
            if message_type == http::WsMessageType::Close {
                println!("got Close push");
                return Ok(());
            }

            assert_eq!(message_type, http::WsMessageType::Binary);

            let Some(blob) = get_blob() else {
                return Err(anyhow!("got WebSocketPush with no blob"));
            };
            println!(
                "got Text from WS: {:?}",
                rmp_serde::from_slice::<String>(&blob.bytes)
            );
        }
    }
    Ok(())
}

call_init!(init);
fn init(our: Address) {
    println!("begin");

    let mut connection: Option<u32> = None;
    http::bind_ws_path(WS_PATH, false, false).unwrap();

    loop {
        match await_message() {
            Ok(message) => {
                if message.source().process == "http_server:distro:sys" {
                    if let Err(e) = handle_http_message(&our, &message, &mut connection) {
                        println!("{e}");
                    }
                }
            }
            Err(_send_error) => println!("got send error!"),
        }
    }
}
}

An example WS client:

#!/usr/bin/env python3

import asyncio
import msgpack
import websockets

async def connect_websocket(
    uri="ws://localhost:8080/ws_server_with_reply:ws_server_with_reply:template.os",
    max_retries=5,
    delay_secs=0.5,
):
    attempt = 0
    while attempt < max_retries:
        try:
            return await websockets.connect(uri, ping_interval=None)
        except (
            websockets.ConnectionClosedError,
            websockets.InvalidURI,
            websockets.InvalidStatusCode,
        ) as e:
            attempt += 1
            await asyncio.sleep(delay_secs)

    raise Exception("Max retries exceeded, unable to connect.")

async def websocket_client():
    websocket = await connect_websocket()

    message = await websocket.recv()
    message = msgpack.unpackb(message, raw=False)
    message = message["WebSocketExtPushData"]
    m = msgpack.unpackb(bytes(message["blob"]), raw=False)
    print(f"Received from server: {m}")

    response = "Hello from client"
    response = msgpack.packb(response, use_bin_type=True)
    await websocket.send(response)

    websocket.close()

def main():
    asyncio.run(websocket_client())

if __name__ == "__main__":
    main()

Full example package & client.

You can find this pattern used in Kinode Extensions.

Get Help: