22 from serial
import Serial
29 ESCAPED_CHARACTER = [
"7E",
"7D",
"11",
"13"]
32 API_FRAME_TX_TIMEOUT = 0.1
33 LOCAL_TX_STATUS_RETRY_COUNT = 1
34 LOCAL_RESPONSE_RETRY_COUNT = 1
35 REMOTE_RX_STATUS_RETRY_COUNT = 1
36 GET_FRAME_RETRY_COUNT = 1
37 IO_SAMPLE_RX_RETRY_COUNT = 1
41 "01":
"UNSUFFICIENT_MEMORY_FOR_PACKET",
42 "02":
"INVALID_COMMAND_CODE",
43 "03":
"INVALID_COMMAND_PARAMETER",
44 "04":
"TX_CCA_FAILURE",
49 "UNSUFFICIENT_MEMORY_FOR_PACKET":
"01",
50 "INVALID_COMMAND_CODE":
"02",
51 "INVALID_COMMAND_PARAMETER":
"03",
52 "TX_CCA_FAILURE":
"04",
85 "03":
"DIGITAL_INPUT",
86 "04":
"DIGITAL_OUTPUT_LOW",
87 "05":
"DIGITAL_OUTPUT_HIGH",
88 "0D":
"COUNTER_INPUT_1",
89 "0E":
"COUNTER_INPUT_2",
90 "0F":
"WAKEUP_INPUT_FALLING",
91 "10":
"WAKEUP_INPUT_RISING" 113 self.
ser.reset_input_buffer()
114 self.
ser.reset_output_buffer()
123 def add_hex2(self, hex1, hex2):
124 return hex(int(hex1, 16) + int(hex2, 16))
126 def sub_hex2(self,hex1, hex2):
127 return hex(int(hex1, 16) - int(hex2, 16))
129 def xor_hex(self, a, b):
130 return '%x' % (int(a, 16) ^ int(b, 16))
135 hv = hex(ch).replace(
'0x',
'')
139 return functools.reduce(
lambda x, y: x + y, lst)
141 def add_bytes(self,byte):
142 self.
bytes.append(byte)
147 def read_frame_infos(self):
153 start_byte = self.
ser.read()
154 if(start_byte == b
""):
155 return {
"length" : 0,
"frame_type" :
"00"}
157 self.
bytes.append(start_byte)
163 return {
"length" : 0,
"frame_type" :
"00"}
164 self.
bytes.append(cc)
174 length_hex += self.
toHex(cc)
176 length = int(length_hex, 16)
177 type_byte = self.
ser.read()
178 if(type_byte == b
""):
179 return {
"length" : 0,
"frame_type" :
"00"}
180 self.
bytes.append(type_byte)
181 type_hex = self.
toHex(type_byte).upper()
182 return {
"length" : length ,
"frame_type" : type_hex}
184 def read_rx_api(self, type = []):
187 length = infos[
"length"]
188 if (infos[
"frame_type"] == type
or type == [])
and length != 0:
190 byte = self.
ser.read()
192 return "ERROR : received less than length bytes" 193 self.
bytes.append(byte)
198 return "ERROR : received a Wrong frame !!! " 200 def filter_frame(self):
202 return "ERROR invalid Frame" 205 for j
in range(1, len(self.
bytes)):
223 if frame_type ==
"81":
225 elif frame_type ==
"83":
227 elif frame_type ==
"87":
229 elif frame_type ==
"88":
231 elif frame_type ==
"89":
233 elif frame_type ==
"8A":
235 elif frame_type ==
"8B":
237 elif frame_type ==
"8C":
239 elif frame_type ==
"8F":
241 elif frame_type ==
"97":
244 def send_local_at(self, frame_type, frame_id, at_command, at_parameter = ""):
245 if len(at_command) != 2:
246 return "Error : verify the Command AT !!!!" 249 ll = 4 + len(at_parameter) // 2
250 length = str(format(ll,
"04x"))
251 frame_local_at.append(length[0:2])
252 frame_local_at.append(length[2:4])
253 frame_local_at.append(frame_type)
254 frame_local_at.append(frame_id)
255 frame_local_at.append(self.
toHex(bytes(at_command.upper(), encoding =
"ascii"))[0:2])
256 frame_local_at.append(self.
toHex(bytes(at_command.upper(), encoding =
"ascii"))[2:4])
257 if len(at_parameter) > 0:
258 for i
in range(0, len(at_parameter), 2):
259 pp = at_parameter.upper()[i:i + 2]
260 frame_local_at.append(pp)
262 for i
in range(2, len(frame_local_at)):
263 checksum = self.
add_hex2(frame_local_at[i], checksum)
264 checksum = (self.
sub_hex2(
"FF", checksum[len(checksum) - 2 : len(checksum)])[2:4]).zfill(2)
265 frame_local_at.append(checksum)
268 tx_packet = binascii.unhexlify(
"".join(map(str, frame_local_at)))
269 self.
ser.write(tx_packet)
278 return self.
send_local_at(
"07", frame_id, at_command, at_parameter)
286 return self.
send_local_at(
"08", frame_id, at_command, at_parameter)
294 return self.
send_local_at(
"09", frame_id, at_command, at_parameter)
313 frame_tx_request = []
315 ll = 4 + len(data) // 2
317 ll = 5 + len(data) // 2
318 length = str(format(ll,
"04x"))
319 frame_tx_request.append(length[0:2])
320 frame_tx_request.append(length[2:4])
322 frame_tx_request.append(
"0F")
324 frame_tx_request.append(
"10")
325 frame_tx_request.append(frame_id)
326 frame_tx_request.append(destination_id[0:2])
327 frame_tx_request.append(destination_id[2:4])
329 frame_tx_request.append(options)
330 for i
in range(0, len(data), 2):
331 pp = data.upper()[i:i + 2]
332 frame_tx_request.append(pp)
334 for i
in range(2, len(frame_tx_request)):
335 checksum = self.
add_hex2(frame_tx_request[i], checksum)
336 checksum = (self.
sub_hex2(
"FF", checksum[len(checksum) - 2 : len(checksum)])[2:4]).zfill(2)
337 frame_tx_request.append(checksum)
339 tx_packet = binascii.unhexlify(
"".join(map(str, frame_tx_request)))
340 self.
ser.write(tx_packet)
359 def send_remote_at(self, frame_id, destination_id, options, at_command, at_parameter = ""):
360 if len(at_command) != 2:
361 return "Error : verify the Command AT !!!!" 364 ll = 7 + len(at_parameter) // 2
365 length = str(format(ll,
"04x"))
366 frame_at.append(length[0:2])
367 frame_at.append(length[2:4])
368 frame_at.append(
"17")
369 frame_at.append(frame_id)
370 frame_at.append(destination_id[0:2])
371 frame_at.append(destination_id[2:4])
372 frame_at.append(options)
373 frame_at.append(self.
toHex(bytes(at_command.upper(), encoding =
"ascii"))[0:2])
374 frame_at.append(self.
toHex(bytes(at_command.upper(), encoding =
"ascii"))[2:4])
375 if len(at_parameter) > 0:
376 for i
in range(0, len(at_parameter), 2):
377 pp = at_parameter.upper()[i:i + 2]
380 for i
in range(2, len(frame_at)):
381 checksum = self.
add_hex2(frame_at[i], checksum)
382 checksum = (self.
sub_hex2(
"FF", checksum[len(checksum) - 2 : len(checksum)])[2:4]).zfill(2)
383 frame_at.append(checksum)
385 tx_packet = binascii.unhexlify(
"".join(map(str, frame_at)))
386 self.
ser.write(tx_packet)
389 def get_frame_common_fields(self):
390 ss =
"".join(map(str, self.
fields))
391 self.
frame[
"DELIMITER"] = ss[0:2]
392 self.
frame[
"LENGTH"] = int(ss[2:6], 16)
393 self.
frame[
"FRAME_TYPE"] = ss[6:8]
423 if not "ERROR" in response:
456 if not "ERROR" in response:
462 def export_81_8F_frame(self, ss):
463 self.
frame[
"SOURCE_ADDRESS_DEC"] = str(int(ss[8:12], 16)).zfill(5)
464 self.
frame[
"SOURCE_ADDRESS_HEX"] = ss[8:12]
465 self.
frame[
"RSSI"] = int(ss[12:14], 16) - 256
466 self.
frame[
'OPTIONS'] = ss[14:16]
467 self.
frame[
'DATA'] = ss[16:len(ss) - 2]
468 self.
frame[
"CHECKSUM"] = ss[len(ss) - 2 : len(ss)]
489 if not "ERROR" in response:
513 if not "ERROR" in response:
537 if not "ERROR" in response:
543 def export_87_88_89_frame(self, ss):
544 self.
frame[
"FRAME_ID"] = ss[8:10]
545 self.
frame[
"AT_COMMAND"] = binascii.unhexlify(ss[10:14]).decode(
"ascii")
546 self.
frame[
"STATUS"] = ss[14:16]
548 self.
frame[
"AT_PARAMETER_HEX"] = []
549 self.
frame[
"AT_PARAMETER"] = []
551 self.
frame[
"AT_PARAMETER_HEX"] = ss[16:len(ss) - 2]
552 self.
frame[
"AT_PARAMETER"] = binascii.unhexlify(ss[16:len(ss) - 2])
553 self.
frame[
"CHECKSUM"] = ss[len(ss) - 2 : len(ss)]
571 if not "ERROR" in response:
577 def export_8A_frame(self, ss):
578 self.
frame[
"STATUS"] = ss[8:10]
579 self.
frame[
"CHECKSUM"] = ss[10:12]
603 if not "ERROR" in response:
609 def export_8B_frame(self, ss):
610 self.
frame[
"FRAME_ID"] = ss[8:10]
611 self.
frame[
"DESTINATION_ADDRESS_DEC"] = str(int(ss[10:14], 16)).zfill(5)
612 self.
frame[
"DESTINATION_ADDRESS_HEX"] = ss[10:14]
613 self.
frame[
"TX_RETRY_COUNT"] = int(ss[14:16], 16)
614 self.
frame[
"STATUS"] = ss[16:18]
615 self.
frame[
"RESERVE"] = ss[18:20]
616 self.
frame[
"CHECKSUM"] = ss[20:22]
638 if not "ERROR" in response:
644 def export_8C_frame(self, ss):
645 self.
frame[
"SOURCE_ADDRESS_DEC"] = str(int(ss[8:12], 16)).zfill(5)
646 self.
frame[
"SOURCE_ADDRESS_HEX"] = ss[8:12]
647 self.
frame[
"RSSI"] = int(ss[12:14], 16) - 256
648 self.
frame[
"OPTIONS"] = ss[14:16]
649 self.
frame[
'FRAME_ID'] = ss[16:18]
650 self.
frame[
"CHECKSUM"] = ss[18:20]
674 if not "ERROR" in response:
680 def export_97_frame(self, ss):
681 self.
frame[
"SOURCE_ADDRESS_DEC"] = str(int(ss[8:12], 16)).zfill(5)
682 self.
frame[
"SOURCE_ADDRESS_HEX"] = ss[8:12]
683 self.
frame[
"RSSI"] = int(ss[12:14], 16) - 256
684 self.
frame[
"OPTIONS"] = ss[14:16]
685 self.
frame[
"AT_COMMAND"] = binascii.unhexlify(ss[16:20]).decode(
"ascii")
686 self.
frame[
"STATUS"] = ss[20:22]
688 self.
frame[
"AT_PARAMETER_HEX"] = []
689 self.
frame[
"AT_PARAMETER"] = []
691 self.
frame[
"AT_PARAMETER_HEX"] = ss[22:len(ss) - 2]
692 self.
frame[
"AT_PARAMETER"] = binascii.unhexlify(ss[22:len(ss) - 2])
693 self.
frame[
"CHECKSUM"] = ss[len(ss) - 2 : len(ss)]
735 if not "ERROR" in response:
741 def export_83_frame(self, ss):
742 self.
frame[
"SOURCE_ADDRESS_DEC"] = str(int(ss[8:12], 16)).zfill(5)
743 self.
frame[
"SOURCE_ADDRESS_HEX"] = ss[8:12]
744 self.
frame[
"RSSI"] = int(ss[12:14], 16) - 256
745 self.
frame[
"OPTIONS"] = ss[14:16]
746 self.
frame[
"TEMPERATURE"] = int(ss[16:18], 16)
747 self.
frame[
"VCC"] = round(int(ss[18:20], 16) / 51, 2)
748 self.
frame[
"DATA_HEX"] = ss[20:len(ss) - 2]
749 self.
frame[
"DATA_PARSED"] = {}
750 self.
frame[
"DATA_PARSED_DECODED"] = {}
752 while i < len(ss) - 2:
753 pin_number = ss[i:i + 2]
756 pin_mode = ss[i:i + 2]
757 pin_mode_decoded = format((int(pin_mode, 16) & 0x7F),
'X').zfill(2)
760 if sample_length == 0:
761 if int(pin_mode, 16) & 0x80:
766 value = ss[i:i + sample_length]
768 self.
frame[
"DATA_PARSED"][pin_id] = {pin_mode_decoded: value}
770 self.
frame[
"DATA_RAW"] = binascii.unhexlify(ss[20:len(ss) - 2])
771 self.
frame[
"CHECKSUM"] = ss[len(ss) - 2 : len(ss)]
782 if not "ERROR" in response:
Класс для проектов, основанных на программном обеспечении SerialStar.
def sub_hex2(self, hex1, hex2)
def send_queue_local_at(self, frame_id, at_command, at_parameter="")
Отправка фрейма с локальной AT-командой с помещением параметра в очередь: API-фрейм 0x09...
def send_local_at(self, frame_type, frame_id, at_command, at_parameter="")
def send_remote_at(self, frame_id, destination_id, options, at_command, at_parameter="")
Отправка фрейма с AT-командой удаленному модулю: API-фрейм 0x17.
int LOCAL_RESPONSE_RETRY_COUNT
dictionary GET_SAMPLE_LENGTH
def run(self)
Метод, осуществляющий постоянный опрос буфера последовательного порта с целью обнаружения в нем всех ...
def get_tx_request_without_option(self)
Получение фрейма с данными от удаленного модуля, переданными без байта опций: API-фрейм 0x8F...
on_rx_87_88_89_frame_callback
def export_81_8F_frame(self, ss)
def add_hex2(self, hex1, hex2)
on_rx_81_8F_frame_callback
def read_frame_infos(self)
def get_frame_common_fields(self)
float API_FRAME_TX_TIMEOUT
def get_tx_request(self)
Получение фрейма с данными от удаленного модуля, переданными с байтом опций: API-фрейм 0x81...
def get_immidiate_apply_local_at_response(self)
Получение фрейма, подтверждающего прием пакета с локальной AT-командой с немедленным применением изме...
def export_8B_frame(self, ss)
def export_8A_frame(self, ss)
def get_queue_local_at_response(self)
Получение фрейма, подтверждающего прием пакета с локальной AT-командой и помещением параметра в очере...
def send_tx_request(self, frame_id, destination_id, data, options="")
Отправка фрейма с данными удаленному модулю, которые должны быть переданы им в UART: API-фреймы 0x10...
def get_io_sample_rx(self)
Получение фрейма с данными о состоянии линий ввода вывода удаленного модуля: API-фрейм 0x83...
def callback_registring(self, frame_type, function_name)
Метод для регистрации callback-функции.
def get_local_tx_status(self)
Получение фрейма со статусом отправки пакета, предназначенного для передачи в эфир: API-фрейм 0x8B...
def export_97_frame(self, ss)
int IO_SAMPLE_RX_RETRY_COUNT
def __init__(self, port, baud)
int REMOTE_RX_STATUS_RETRY_COUNT
def read_rx_api(self, type=[])
def send_immidiate_apply_and_save_local_at(self, frame_id, at_command, at_parameter="")
Отправка фрейма с локальной AT-командой с немедленным применением и сохранением изменений в энергонез...
int GET_FRAME_RETRY_COUNT
int LOCAL_TX_STATUS_RETRY_COUNT
def get_immidiate_apply_and_save_local_at_response(self)
Получение фрейма, подтверждающего прием пакета с локальной AT-командой с немедленным применением и со...
def get_remote_at_command_response(self)
Получение фрейма с ответом от удаленного модуля на отправленную ему AT-команду: API-фрейм 0x97...
def export_83_frame(self, ss)
def export_87_88_89_frame(self, ss)
dictionary PIN_MODE_DECODE
def send_immidiate_apply_local_at(self, frame_id, at_command, at_parameter="")
Отправка фрейма с локальной AT-командой с немедленным применением изменений: API-фрейм 0x07...
def export_8C_frame(self, ss)
def get_remote_tx_status(self)
Получение фрейма с подтверждением доставки пакета с данными удаленному модулю: API-фрейм 0x8C...
def get_modem_status(self)
Получение фрейма со статусом модуля, отправляемого им при рестарте: API-фрейм 0x8A.