Walkthrough of the example code provided with linuxcnc-grpc.

Overview#

Examples are provided in four languages, each implementing the same functionality:

ExampleDescription
get_statusPoll and display machine status
stream_statusReal-time status streaming
jog_axisInteractive jogging with keyboard
mdi_commandExecute G-code via MDI
hal_queryQuery HAL pins, signals, parameters
upload_fileUpload, list, and delete G-code files

Directory Structure#

examples/
├── python/
│   ├── get_status.py
│   ├── stream_status.py
│   ├── jog_axis.py
│   ├── mdi_command.py
│   ├── hal_query.py
│   └── upload_file.py
├── go/
│   └── cmd/
│       ├── get_status/
│       ├── stream_status/
│       ├── jog_axis/
│       ├── mdi_command/
│       ├── hal_query/
│       └── upload_file/
├── node/
│   ├── get_status.ts
│   ├── stream_status.ts
│   ├── jog_axis.ts
│   ├── mdi_command.ts
│   ├── hal_query.ts
│   └── upload_file.ts
├── rust/
│   ├── Cargo.toml
│   └── src/bin/
│       ├── get_status.rs
│       ├── stream_status.rs
│       ├── jog_axis.rs
│       ├── mdi_command.rs
│       ├── hal_query.rs
│       └── upload_file.rs
└── README.md

Running Examples#

Prerequisites#

  1. LinuxCNC running (or use a mock server for testing)
  2. gRPC server running on the LinuxCNC machine
  3. Language-specific dependencies installed
cd examples/python

# Install dependencies (if using virtualenv)
pip install grpcio linuxcnc-grpc

# Run an example
python get_status.py --host localhost --port 50051
cd examples/go

# Download dependencies
go mod download

# Run an example
go run ./cmd/get_status --host localhost --port 50051
cd examples/node

# Install dependencies
npm install

# Run an example
npx tsx get_status.ts --host localhost --port 50051
cd examples/rust

# Build all examples
cargo build --release

# Run an example
cargo run --bin get_status -- --host localhost --port 50051

# Or run the built binary directly
./target/release/get_status --host localhost --port 50051

get_status#

The simplest example - polls the machine once and displays status.

What it demonstrates#

  • Connecting to the gRPC server
  • Calling GetStatus RPC
  • Parsing the LinuxCNCStatus response
  • Displaying task state, position, joints, spindles, and I/O

Key code (Python)#

# Connect
channel = grpc.insecure_channel(f"{host}:{port}")
stub = linuxcnc_pb2_grpc.LinuxCNCServiceStub(channel)

# Request status
status = stub.GetStatus(linuxcnc_pb2.GetStatusRequest())

# Access nested fields
print(f"Mode: {linuxcnc_pb2.TaskMode.Name(status.task.task_mode)}")
print(f"X: {status.position.actual_position.x:.4f}")

Output#

============================================================
LinuxCNC Status
============================================================

[Task]
  Mode:       MODE_MANUAL
  State:      STATE_ON
  Exec State: EXEC_DONE
  Interp:     INTERP_IDLE

[Position]
  X:     0.0000  Y:     0.0000  Z:     0.0000

[Trajectory]
  Enabled:    True
  Feed Rate:  100.0%
  Rapid Rate: 100.0%
  Velocity:   0.00

[Joints]
  Joint 0: [HE-] pos=    0.0000
  Joint 1: [HE-] pos=    0.0000
  Joint 2: [HE-] pos=    0.0000

stream_status#

Real-time status streaming instead of polling.

What it demonstrates#

  • Using StreamStatus server-streaming RPC
  • Setting update interval
  • Processing a continuous stream of updates
  • Calculating update rate

Key code#

# Start streaming with 100ms interval
request = linuxcnc_pb2.StreamStatusRequest(interval_ms=100)

for status in stub.StreamStatus(request):
    pos = status.position.actual_position
    print(f"\rX={pos.x:8.3f} Y={pos.y:8.3f} Z={pos.z:8.3f}", end="")
stream, err := client.StreamStatus(ctx, &pb.StreamStatusRequest{
    IntervalMs: 100,
})

for {
    status, err := stream.Recv()
    if err == io.EOF {
        break
    }
    pos := status.Position.ActualPosition
    fmt.Printf("X=%.3f Y=%.3f Z=%.3f\n", pos.X, pos.Y, pos.Z)
}

Output#

Streaming status updates (Ctrl+C to stop)...
[  1] X=  0.000 Y=  0.000 Z=  0.000 | Feed:100% Vel:  0.00
[  2] X=  0.000 Y=  0.000 Z=  0.000 | Feed:100% Vel:  0.00
[  3] X=  0.500 Y=  0.000 Z=  0.000 | Feed:100% Vel: 10.00
...

jog_axis#

Interactive jogging with keyboard controls.

What it demonstrates#

  • Sending state commands (ESTOP_RESET, ON)
  • Sending mode commands (MANUAL)
  • Jogging with JOG_CONTINUOUS and JOG_STOP
  • Incremental jogging with JOG_INCREMENT
  • Keyboard input handling

Key code (Python)#

def jog_start(stub, axis, velocity):
    cmd = linuxcnc_pb2.LinuxCNCCommand()
    cmd.serial = next_serial()
    cmd.jog.type = linuxcnc_pb2.JOG_CONTINUOUS
    cmd.jog.is_joint = False  # Axis mode (vs joint mode)
    cmd.jog.index = axis      # 0=X, 1=Y, 2=Z
    cmd.jog.velocity = velocity
    return stub.SendCommand(cmd)

def jog_stop(stub, axis):
    cmd = linuxcnc_pb2.LinuxCNCCommand()
    cmd.serial = next_serial()
    cmd.jog.type = linuxcnc_pb2.JOG_STOP
    cmd.jog.index = axis
    return stub.SendCommand(cmd)

Controls#

Keyboard Controls:
  Arrow keys: Jog X/Y axes
  Page Up/Down: Jog Z axis
  +/-: Adjust jog speed
  Space: Emergency stop
  Q: Quit

mdi_command#

Execute G-code commands via MDI (Manual Data Input).

What it demonstrates#

  • Checking and setting machine state
  • Checking and setting task mode
  • Sending MDI commands
  • Using WaitComplete to wait for execution
  • Interactive command loop

Key code (Python)#

def ensure_mdi_ready(client):
    """Ensure machine is ready for MDI commands."""
    status = client.get_status()

    # Reset E-stop if needed
    if status.task.task_state == linuxcnc_pb2.STATE_ESTOP:
        client.set_state(linuxcnc_pb2.STATE_ESTOP_RESET)

    # Power on
    if status.task.task_state != linuxcnc_pb2.STATE_ON:
        client.set_state(linuxcnc_pb2.STATE_ON)

    # Set MDI mode
    if status.task.task_mode != linuxcnc_pb2.MODE_MDI:
        client.set_mode(linuxcnc_pb2.MODE_MDI)

def execute_mdi(client, gcode):
    """Execute G-code and wait for completion."""
    cmd = linuxcnc_pb2.LinuxCNCCommand()
    cmd.serial = next_serial()
    cmd.mdi.command = gcode
    response = client.send_command(cmd)

    # Wait for completion
    response = client.wait_complete(cmd.serial, timeout=60.0)
    return response.status == linuxcnc_pb2.RCS_DONE

Usage#

# Single command
python mdi_command.py "G0 X10 Y10"

# Interactive mode
python mdi_command.py --interactive

Interactive session#

MDI> G0 X10 Y10
Executing: G0 X10 Y10
  Waiting for completion...
  Done.
Position: X=10.0000 Y=10.0000 Z=0.0000

MDI> G1 X20 F100
Executing: G1 X20 F100
  Waiting for completion...
  Done.
Position: X=20.0000 Y=10.0000 Z=0.0000

MDI> status
Position: X=20.0000 Y=10.0000 Z=0.0000

MDI> quit

hal_query#

Query HAL pins, signals, and parameters.

What it demonstrates#

  • Using HalService
  • Querying with glob patterns
  • Different query types (pins, signals, params, components)
  • Formatting HAL values

Key code (Python)#

# Create HAL service stub
hal_stub = hal_pb2_grpc.HalServiceStub(channel)

# Query pins matching pattern
request = hal_pb2.QueryPinsCommand(pattern="axis.*")
response = hal_stub.QueryPins(request)

for pin in response.pins:
    value = format_hal_value(pin.value)
    direction = hal_pb2.PinDirection.Name(pin.direction)
    print(f"{pin.name}: {value} ({direction})")

Usage#

# Query all axis pins
python hal_query.py --pins "axis.*"

# Query spindle signals
python hal_query.py --signals "spindle*"

# Query all motion parameters
python hal_query.py --params "motion.*"

# List all components
python hal_query.py --components "*"

Output#

=== HAL Pins matching "axis.x.*" ===
axis.x.pos-cmd: 10.500000 (HAL_OUT)
axis.x.pos-fb: 10.499823 (HAL_IN)
axis.x.vel-cmd: 0.000000 (HAL_OUT)
axis.x.homed: True (HAL_OUT)

=== HAL Signals matching "spindle*" ===
spindle-speed-out: 1200.000000
spindle-at-speed: True
spindle-on: True

upload_file#

Upload, list, and delete G-code files on the LinuxCNC machine.

What it demonstrates#

  • Using UploadFile to write G-code files remotely
  • Using ListFiles to browse the nc_files directory
  • Using DeleteFile to clean up uploaded files
  • File management error handling

Key code (Python)#

# Upload a G-code file
request = linuxcnc_pb2.UploadFileRequest(
    filename="my_part.ngc",
    content="G0 X10 Y10\nG1 Z-5 F100\nM2\n"
)
response = stub.UploadFile(request)
print(f"Written to: {response.path}")

# List files
list_response = stub.ListFiles(linuxcnc_pb2.ListFilesRequest())
for f in list_response.files:
    print(f"{f.name}: {f.size_bytes} bytes")

# Delete file
stub.DeleteFile(linuxcnc_pb2.DeleteFileRequest(filename="my_part.ngc"))

Usage#

# Upload a sample file
python upload_file.py --host localhost --port 50051

# Upload and then delete (cleanup)
python upload_file.py --cleanup

Output#

Uploading 'grpc_example.ngc'...
  Written to: /home/linuxcnc/linuxcnc/nc_files/grpc_example.ngc
  Size: 142 bytes

Listing files...
  Directory: /home/linuxcnc/linuxcnc/nc_files
  Name                               Size  Type
  ------------------------------ --------  ----
  grpc_example.ngc                    142  FILE
  other_program.ngc                   523  FILE
  projects                           4096  DIR

Testing with Mock Server#

For development without a real LinuxCNC installation, use the mock server:

# Start mock server
python tests/mock_server.py --port 50051

# Run examples against it
python examples/python/get_status.py --port 50051

The mock server simulates:

  • All status fields with realistic values
  • Command responses
  • Status streaming
  • HAL system status

Common Patterns#

Client Wrapper Class#

Many examples use a client wrapper for cleaner code:

class LinuxCNCClient:
    def __init__(self, host, port):
        self.channel = grpc.insecure_channel(f"{host}:{port}")
        self.stub = linuxcnc_pb2_grpc.LinuxCNCServiceStub(self.channel)
        self._serial = 0

    def _next_serial(self):
        self._serial += 1
        return self._serial

    def get_status(self):
        return self.stub.GetStatus(linuxcnc_pb2.GetStatusRequest())

    def send_command(self, cmd):
        cmd.serial = self._next_serial()
        cmd.timestamp = int(time.time() * 1e9)
        return self.stub.SendCommand(cmd)

Error Handling#

try:
    status = stub.GetStatus(request)
except grpc.RpcError as e:
    if e.code() == grpc.StatusCode.UNAVAILABLE:
        print("Server not available")
    elif e.code() == grpc.StatusCode.DEADLINE_EXCEEDED:
        print("Request timed out")
    else:
        print(f"Error: {e.code()}: {e.details()}")

Streaming Timeouts#

The Rust streaming examples (stream_status, hal_query watch) use per-message timeouts to detect unresponsive servers. Each stream.next() call is wrapped with tokio::time::timeout:

use tokio::time::{timeout, Duration};

// 30-second timeout per message
match timeout(Duration::from_secs(30), stream.next()).await {
    Ok(Some(Ok(status))) => { /* process */ }
    Ok(Some(Err(e))) => { /* gRPC error */ }
    Ok(None) => { /* stream ended */ }
    Err(_) => { eprintln!("Timeout waiting for update"); break; }
}

This prevents indefinite hangs if the server becomes unresponsive.

Graceful Shutdown#

import signal

def signal_handler(sig, frame):
    print("\nShutting down...")
    channel.close()
    sys.exit(0)

signal.signal(signal.SIGINT, signal_handler)