Last week i tried to read and write data to and from a SICK FlexiSoft Safty controller which has a Modbus/TCP gateway.
Because i had a lot of trouble to get that up and running i decided to write a few lines about i managed to solve my problems.
I connected the safety controller to my computer using a normal 100Mbit switch. But at first i did the mistake to use port 9100 instead of port 502 which is the standard port for Modbus/TCP. I did that mistake because on the SICK FlexiSoft Designer configuration the port 9100 is shown but no word about port 502.
I decided not to use a ready to use Modbus lib like pyModbus because i wanted to learn something.
So here is my code:
1#!/usr/bin/env python
2#-*- coding: utf-8 -*-
3
4#DEBUG = True
5DEBUG = False
6
7import socket
8from struct import pack
9
10class Modbus:
11
12 def __init__(self, host, port):
13 self.host = host
14 self.port = port
15 self.socket = None
16
17 def connect(self):
18 if self.socket is None:
19 try:
20 self.socket = socket.create_connection((self.host,self.port))
21 except socket.error as msg:
22 print("Connection to {0}{1} failed:\n{2}".format(self.host, self.port, msg))
23
24 def disconnect(self):
25 if self.socket is not None:
26 self.socket.close()
27 self.socket = None
28
29 def read_holding_registers(self, reg_addr, reg_cnt, tid=0, uid=0):
30 _tid = pack('>H', tid)
31 _pid = pack('>H', 0)
32 _uid = pack('>B', uid)
33 _fcode = pack('>B', 0x03)
34 _reg_addr = pack('>H', reg_addr-1)
35 _reg_cnt = pack('>H', reg_cnt)
36 _length = pack('>H', (len(_uid) + len(_fcode) + len(_reg_addr) + len(_reg_cnt)))
37 msg = _tid + _pid + _length + _uid + _fcode + _reg_addr + _reg_cnt
38
39 if DEBUG:
40 print("sent:\n{0}".format([hex(b) for b in msg]))
41
42 self.socket.send(msg)
43
44 header = self.socket.recv(9)
45 if DEBUG:
46 print("recieved header:\n{0}".format([hex(b) for b in header]))
47 print("receive {0} bytes of data".format(header[8]))
48
49 _data = self.socket.recv(header[8])
50
51 if DEBUG:
52 print("recieved data:\n{0}".format([hex(b) for b in _data]))
53
54 data = []
55 # rotate big endian 16bit values
56 for i in range(0,len(_data)-1,2):
57 data.append(_data[i+1])
58 data.append(_data[i])
59 return data
60
61 def write_multiple_registers(self, reg_addr, data, tid=0, uid=0):
62 _tid = pack('>H', tid)
63 _pid = pack('>H', 0)
64 _uid = pack('>B', uid)
65 _fcode = pack('>B', 0x10)
66 _reg_addr = pack('>H', reg_addr-1)
67 _reg_cnt = pack('>H', int(len(data)/2))
68 _byte_cnt = pack('>B', len(data))
69 _data = b''
70 # rotate big endian 16bit values
71 for i in range(0,len(data)-1,2):
72 _data += pack('>B', data[i+1])
73 _data += pack('>B', data[i])
74 _length = pack('>H', (len(_uid) + len(_fcode) + len(_reg_addr) + len(_reg_cnt) + len(_byte_cnt) + len(_data)))
75
76 msg = _tid + _pid + _length + _uid + _fcode + _reg_addr + _reg_cnt + _byte_cnt + _data
77
78 if DEBUG:
79 print("sent:\n{0}".format([hex(b) for b in msg]))
80
81
82 self.socket.send(msg)
83
84 data = self.socket.recv(12)
85 if DEBUG:
86 print("recieved data:\n{0}".format([hex(b) for b in data]))
87 return [hex(b) for b in data]
So far I’ve just implemented read_holding_registers
and write_multiple_registers
beacuse the gateway does only support these two commands and one other.
For me those two functions are everything i need, so i will not implement any other commands.
I use the code to read status information from the controller and write bits to trigger actions.