1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
|
#!/usr/bin/env python
#-*- coding: utf-8 -*-
#DEBUG = True
DEBUG = False
import socket
from struct import pack
class Modbus:
def __init__(self, host, port):
self.host = host
self.port = port
self.socket = None
def connect(self):
if self.socket is None:
try:
self.socket = socket.create_connection((self.host,self.port))
except socket.error as msg:
print("Connection to {0}{1} failed:\n{2}".format(self.host, self.port, msg))
def disconnect(self):
if self.socket is not None:
self.socket.close()
self.socket = None
def read_holding_registers(self, reg_addr, reg_cnt, tid=0, uid=0):
_tid = pack('>H', tid)
_pid = pack('>H', 0)
_uid = pack('>B', uid)
_fcode = pack('>B', 0x03)
_reg_addr = pack('>H', reg_addr-1)
_reg_cnt = pack('>H', reg_cnt)
_length = pack('>H', (len(_uid) + len(_fcode) + len(_reg_addr) + len(_reg_cnt)))
msg = _tid + _pid + _length + _uid + _fcode + _reg_addr + _reg_cnt
if DEBUG:
print("sent:\n{0}".format([hex(b) for b in msg]))
self.socket.send(msg)
header = self.socket.recv(9)
if DEBUG:
print("recieved header:\n{0}".format([hex(b) for b in header]))
print("receive {0} bytes of data".format(header[8]))
_data = self.socket.recv(header[8])
if DEBUG:
print("recieved data:\n{0}".format([hex(b) for b in _data]))
data = []
# rotate big endian 16bit values
for i in range(0,len(_data)-1,2):
data.append(_data[i+1])
data.append(_data[i])
return data
def write_multiple_registers(self, reg_addr, data, tid=0, uid=0):
_tid = pack('>H', tid)
_pid = pack('>H', 0)
_uid = pack('>B', uid)
_fcode = pack('>B', 0x10)
_reg_addr = pack('>H', reg_addr-1)
_reg_cnt = pack('>H', int(len(data)/2))
_byte_cnt = pack('>B', len(data))
_data = b''
# rotate big endian 16bit values
for i in range(0,len(data)-1,2):
_data += pack('>B', data[i+1])
_data += pack('>B', data[i])
_length = pack('>H', (len(_uid) + len(_fcode) + len(_reg_addr) + len(_reg_cnt) + len(_byte_cnt) + len(_data)))
msg = _tid + _pid + _length + _uid + _fcode + _reg_addr + _reg_cnt + _byte_cnt + _data
if DEBUG:
print("sent:\n{0}".format([hex(b) for b in msg]))
self.socket.send(msg)
data = self.socket.recv(12)
if DEBUG:
print("recieved data:\n{0}".format([hex(b) for b in data]))
return [hex(b) for b in data]
|