1
0
mirror of https://github.com/3cky/mbusd synced 2025-10-26 23:46:44 +08:00

Merge pull request #35 from nickma82/test_simplification

Test simplification
This commit is contained in:
Victor Antonovich 2018-06-19 19:31:43 +04:00 committed by GitHub
commit a3a6530eed
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 62 additions and 70 deletions

View File

@ -1,7 +1,7 @@
--- ---
stages: stages:
- build
- test - test
- build
.mbusd_job_template: &mbusd_job_template # Hidden key that defines an anchor .mbusd_job_template: &mbusd_job_template # Hidden key that defines an anchor
image: debian:stable image: debian:stable
@ -22,7 +22,7 @@ build:fedora_x86:
image: fedora:latest image: fedora:latest
stage: build stage: build
before_script: before_script:
- yum -y install cmake pkgconfig gcc-c++ rpmdevtools - yum -y install make cmake pkgconfig gcc-c++ rpmdevtools
script: script:
- mkdir output.dir/ - mkdir output.dir/
- cd output.dir - cd output.dir
@ -46,7 +46,7 @@ build:deb_x86:
stage: build stage: build
before_script: before_script:
- apt update -qq - apt update -qq
- apt install -y -qq build-essential pkg-config cmake - apt install -y -qq --no-install-recommends build-essential pkg-config cmake
script: script:
- mkdir output.dir/ - mkdir output.dir/
- cd output.dir - cd output.dir
@ -60,17 +60,15 @@ test_x86:
#https://forum.gitlab.com/t/testing-copy-yaml-file-to-build-folder/8309 #https://forum.gitlab.com/t/testing-copy-yaml-file-to-build-folder/8309
before_script: before_script:
- apt update -qq - apt update -qq
- apt install -y -qq build-essential pkg-config cmake - apt install -y -qq --no-install-recommends build-essential pkg-config cmake
- apt install -y -qq python-dev python-pip python-setuptools socat - apt install -y -qq --no-install-recommends python-dev python-pip python-setuptools socat
- python -m pip install pymodbus service_identity - python -m pip install pymodbus service_identity twisted
dependencies:
- build:deb_x86
script: script:
- mkdir -p output.dir/ - mkdir -p output.dir/ && cd $_
- cd output.dir - cmake ../
- cmake -DTESTS=True ../ - make
- make VERBOSE=1
# execute all tests # execute all tests
- ./test_* - (cd ../ && python tests/run_itests.py output.dir/mbusd)
#- make VERBOSE=1 CTEST_OUTPUT_ON_FAILURE=1 test

View File

View File

@ -8,6 +8,7 @@ The asynchronous server is a high performance implementation using the
twisted library as its backend. This allows it to scale to many thousands twisted library as its backend. This allows it to scale to many thousands
of nodes which can be helpful for testing monitoring software. of nodes which can be helpful for testing monitoring software.
''' '''
import logging
#---------------------------------------------------------------------------# #---------------------------------------------------------------------------#
# import the various server implementations # import the various server implementations
#---------------------------------------------------------------------------# #---------------------------------------------------------------------------#
@ -26,10 +27,7 @@ class ModbusSerialServer:
#---------------------------------------------------------------------------# #---------------------------------------------------------------------------#
# configure the service logging # configure the service logging
#---------------------------------------------------------------------------# #---------------------------------------------------------------------------#
import logging log = logging.getLogger("ModbusServer")
logging.basicConfig()
log = logging.getLogger()
log.setLevel(logging.DEBUG)
serialPort = readlink('/tmp/pts1') serialPort = readlink('/tmp/pts1')
@ -126,9 +124,8 @@ class ModbusSerialServer:
self.p.start() self.p.start()
print("p.start done") print("p.start done")
def kill(self): def kill(self):
print("Going to terminate the process, this could throw exceptions") self.log.info("Going to terminate the process, this could throw exceptions")
if self.p is not None: if self.p is not None:
self.p.terminate() self.p.terminate()

View File

@ -1,28 +0,0 @@
#!/usr/bin/env bash
RTU_SLAVE_PID=/tmp/rtu_slave.pid
CURRENT_DIR="$(dirname "$(realpath "$0")")"
. $CURRENT_DIR/subprocess_helper.sh
check_preconditions() {
python -c "import pymodbus" || exit 1
}
# check argument count
## https://stackoverflow.com/questions/4341630/checking-for-the-correct-number-of-arguments
if [ "$#" -ne 1 ]; then
echo "[E] usage: $0 <start|stop>" >&2
exit 1
fi
check_preconditions
case "$1" in
up|start)
CMD="python ${CURRENT_DIR}/rtu_slave.py &"
run_cmd_save_pid "$CMD" $RTU_SLAVE_PID
;;
down|stop)
kill_pid $RTU_SLAVE_PID
;;
esac

View File

@ -2,25 +2,59 @@
import random import random
import unittest import unittest
import sys
import logging
from subprocess import Popen
from os.path import isfile
from time import sleep
from pymodbus.client.sync import ModbusTcpClient from pymodbus.client.sync import ModbusTcpClient
from pymodbus.pdu import ExceptionResponse, ModbusExceptions from pymodbus.pdu import ExceptionResponse
from pymodbus.bit_read_message import ReadDiscreteInputsResponse, ReadCoilsResponse from pymodbus.bit_read_message import ReadDiscreteInputsResponse, ReadCoilsResponse
from pymodbus.bit_write_message import WriteMultipleCoilsResponse, WriteSingleCoilResponse from pymodbus.bit_write_message import WriteMultipleCoilsResponse, WriteSingleCoilResponse
from pymodbus.register_read_message import ReadInputRegistersResponse, ReadHoldingRegistersResponse from pymodbus.register_read_message import ReadInputRegistersResponse, ReadHoldingRegistersResponse
from pymodbus.register_write_message import WriteMultipleRegistersResponse, WriteSingleRegisterResponse from pymodbus.register_write_message import WriteMultipleRegistersResponse, WriteSingleRegisterResponse
MBUSD_PORT = 1025 MBUSD_PORT = 1025
MBUSD_BINARY = "../output.dir/mbusd"
class TestModbusRequests(unittest.TestCase): class TestModbusRequests(unittest.TestCase):
log = logging.getLogger("TestModbusRequests")
@classmethod @classmethod
def setUpClass(cls): def setUpClass(cls):
cls.log.debug("1. run socat")
cls.socat = Popen(["socat", "-d", "-d", "pty,raw,echo=0,link=/tmp/pts0", "pty,raw,echo=0,link=/tmp/pts1"])
cls.log.debug("2. run rtu_slave")
from environment.rtu_slave import ModbusSerialServer
cls.mbs = ModbusSerialServer()
cls.mbs.start()
cls.log.debug("3. run mbusd to be tested with the binary:%s" % MBUSD_BINARY)
cls.mbusd_main = Popen([MBUSD_BINARY, "-d", "-L", "-v9", "-p/tmp/pts0", "-s19200", "-P" + str(MBUSD_PORT)])
# wait a little bit for mbusd to come up
# alternatively do a poll for the socket
# https://stackoverflow.com/questions/667640/how-to-tell-if-a-connection-is-dead-in-python/667702#667702
sleep(5)
cls.log.debug("4. connect the modbus TCP client to mbusd")
cls.client = ModbusTcpClient('127.0.0.1', port=MBUSD_PORT) cls.client = ModbusTcpClient('127.0.0.1', port=MBUSD_PORT)
cls.client.connect() cls.client.connect()
@classmethod @classmethod
def tearDownClass(cls): def tearDownClass(cls):
cls.log.info("test teardown")
cls.log.debug("4. kill tcp_client")
cls.client.close() cls.client.close()
cls.log.debug("3. kill mbusd")
cls.mbusd_main.kill()
cls.log.debug("2. kill rtu_slave")
cls.mbs.kill()
cls.log.debug("1. kill socat")
cls.socat.kill()
def test_coils(self): def test_coils(self):
bits = [random.randrange(2)>0 for i in range(8)] bits = [random.randrange(2)>0 for i in range(8)]
@ -88,6 +122,15 @@ class TestModbusRequests(unittest.TestCase):
self.assertEqual(result.original_code, 5, result) # fc05 Write Single Coil self.assertEqual(result.original_code, 5, result) # fc05 Write Single Coil
self.assertEqual(result.exception_code, 2, result) # Illegal Data Address self.assertEqual(result.exception_code, 2, result) # Illegal Data Address
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() stdout_handler = logging.StreamHandler(sys.stdout)
logging.basicConfig(level=logging.DEBUG,
format=u'[%(asctime)s] %(name)-26s-%(levelname)-5s %(funcName)-20s:%(lineno)-4d \033[35m%(message)s\033[0m',
datefmt='%d.%m. %H:%M:%S',
handlers=[stdout_handler])
if len(sys.argv) != 2 or not isfile(sys.argv[1]):
logging.error("usage: ./run_itests.py <mbusd_binary_path>")
sys.exit(1)
MBUSD_BINARY = sys.argv[1]
unittest.main(verbosity=2, argv=[sys.argv[0]])

View File

@ -8,24 +8,6 @@ if ! [ -x "$1" ]; then
fi fi
export MBUSD_BIN=$1 export MBUSD_BIN=$1
$CWD/run_itests.py "$MBUSD_BIN" || exit 1
function setup() {
echo "[I] do test environment setup"
$CWD/environment/socat_runner.sh start || return 1
$CWD/environment/rtu_slave_runner.sh start || return 1
$CWD/environment/mbusd_runner.sh start || return 1
}
function teardown() {
echo "[I] do test environment teardown"
$CWD/environment/mbusd_runner.sh stop
$CWD/environment/rtu_slave_runner.sh stop
$CWD/environment/socat_runner.sh stop
}
trap teardown EXIT
setup || exit 1
$CWD/run_itests.py || exit 1
exit 0 exit 0