Modbus Server

Viewpoint

The uModbus server code is built with routing in mind. Routing (groups of) requests to a certain callback is easy. This is in contrast with with other Modbus implementation which often focus on reading and writing from a data store.

Because of this difference in viewpoint uModbus doesn’t know the concept of Modbus’ data models like discrete inputs, coils, input registers, holding registers and their read/write properties.

Routing

The routing system was inspired by Flask. Like Flask, uModbus requires a global app or server. This server contains a route map. Routes can be added to the route map.

The following code example demonstrates how to implement a very simple data store for 10 addresses.

Modbus TCP example

#!/usr/bin/env python
# scripts/examples/simple_data_store.py
import logging
from socketserver import TCPServer
from collections import defaultdict

from umodbus import conf
from umodbus.server.tcp import RequestHandler, get_server
from umodbus.utils import log_to_stream

# Add stream handler to logger 'uModbus'.
log_to_stream(level=logging.DEBUG)

# A very simple data store which maps addresses against their values.
data_store = defaultdict(int)

# Enable values to be signed (default is False).
conf.SIGNED_VALUES = True

TCPServer.allow_reuse_address = True
app = get_server(TCPServer, ('localhost', 502), RequestHandler)


@app.route(slave_ids=[1], function_codes=[1, 2], addresses=list(range(0, 10)))
def read_data_store(slave_id, function_code, address):
    """" Return value of address. """
    return data_store[address]


@app.route(slave_ids=[1], function_codes=[5, 15], addresses=list(range(0, 10)))
def write_data_store(slave_id, function_code, address, value):
    """" Set value for address. """
    data_store[address] = value

if __name__ == '__main__':
    try:
        app.serve_forever()
    finally:
        app.shutdown()
        app.server_close()

Modbus RTU example

#!/usr/bin/env python
from serial import Serial
from collections import defaultdict

from umodbus.server.serial import get_server
from umodbus.server.serial.rtu import RTUServer

s = Serial('/dev/ttyS1')
s.timeout = 10

data_store = defaultdict(int)
app = get_server(RTUServer, s)


@app.route(slave_ids=[1], function_codes=[1, 2], addresses=list(range(0, 10)))
def read_data_store(slave_id, function_code, address):
    """" Return value of address. """
    return data_store[address]


@app.route(slave_ids=[1], function_codes=[5, 15], addresses=list(range(0, 10)))
def write_data_store(slave_id, function_code, address, value):
    """" Set value for address. """
    data_store[address] = value

if __name__ == '__main__':
    try:
        app.serve_forever()
    finally:
        app.shutdown()