Spaces:
Sleeping
Sleeping
| # #!/usr/bin/python | |
| # -*- coding: utf-8 -*- | |
| # test_preempt_return.py | |
| # pylint: disable=line-too-long,missing-docstring,bad-whitespace, unused-argument, too-many-locals | |
| import sys | |
| import os | |
| import random | |
| import unittest | |
| DIRNAME_MODULE = os.path.dirname(os.path.dirname(os.path.dirname(os.path.realpath(sys.argv[0])))) + os.sep | |
| sys.path.append(DIRNAME_MODULE) | |
| sys.path.append(DIRNAME_MODULE + "pyss" + os.sep) | |
| from pyss import pyssobject | |
| from pyss.pyss_model import PyssModel | |
| from pyss.segment import Segment | |
| from pyss.generate import Generate | |
| from pyss.terminate import Terminate | |
| from pyss import logger | |
| from pyss.table import Table | |
| from pyss.handle import Handle | |
| from pyss.enter import Enter | |
| from pyss.leave import Leave | |
| from pyss.storage import Storage | |
| from pyss.advance import Advance | |
| from pyss.preempt import Preempt | |
| from pyss.g_return import GReturn | |
| from pyss.facility import Facility | |
| from pyss.seize import Seize | |
| from pyss.release import Release | |
| from pyss.transfer import Transfer | |
| from pyss.test import Test | |
| from pyss.pyss_const import * | |
| class TestPreemptReturn(unittest.TestCase): | |
| def setUp(self): | |
| pass | |
| def tearDown(self): | |
| pass | |
| # @unittest.skip("testing skipping test_preempt_return_001") | |
| def test_preempt_return_001(self): | |
| """Тест Preempt - Return | |
| Формируется один транзакт в момент времени 1. | |
| Прерывает работу устройства F_1 на 5 единиц времени. | |
| Выходит из модели в момент времени 6. | |
| """ | |
| logger.info("--- test_preempt_return_001 ----------------------------------") | |
| ### MODEL ---------------------------------- | |
| m = PyssModel() | |
| sgm = Segment(m) | |
| # | |
| m[OPTIONS].setAllFalse() | |
| MAX_TIME = 20 | |
| # | |
| list_all_transact = [] | |
| # | |
| MAX_TIME = 20 | |
| # | |
| F_1 = "F_1" | |
| # ОКУ | |
| Facility(m, facilityName=F_1) | |
| # | |
| def funcTransactTo_list_all_transact(owner, transact): | |
| # складируем транзакты в список | |
| list_all_transact.append(transact) | |
| ### SEGMENT ---------------------------- | |
| # формируется одна заявка в момент времени 1 | |
| Generate(sgm, med_value=None, | |
| modificatorFunc=None, | |
| first_tx=1, | |
| max_amount=1) | |
| Handle(sgm, handlerFunc=funcTransactTo_list_all_transact) | |
| # test | |
| Handle(sgm, handlerFunc=lambda o, t:self.assertNotIn(F_1, t[FACILITY])) | |
| # | |
| Preempt(sgm, facilityName=F_1) | |
| # test | |
| Handle(sgm, handlerFunc=lambda o, t:self.assertIn(F_1, t[FACILITY])) | |
| # | |
| Advance(sgm, meanTime=5, modificatorFunc=None) | |
| GReturn(sgm, facilityName=F_1) | |
| # test | |
| Handle(sgm, handlerFunc=lambda o, t:not self.assertNotIn(F_1, t[FACILITY])) | |
| # | |
| Terminate(sgm, deltaTerminate=0) | |
| # ЗАПУСК ---------------------- | |
| m.start(terminationCount=MAX_TIME, maxTime=MAX_TIME) | |
| # ТЕСТЫ ---------------------- | |
| for t in list_all_transact: | |
| self.assertEqual(t[TIME_CREATED], 1) | |
| self.assertEqual(t[TERMINATED_TIME], 6) | |
| print str(["%s:%s" % (k, t[k]) | |
| for k in t.keys() if k | |
| in [TIME_CREATED, TERMINATED_TIME]]) | |
| # @unittest.skip("testing skipping test_preempt_return_002") | |
| def test_preempt_return_002(self): | |
| """Тест Preempt - Return | |
| Формируется транзакт A в момент времени 1. | |
| Идёт на обработку устройством F_1 в течение 3 единиц времени. | |
| Формируется транзакт B в момент времени 2. | |
| Прерывает работу устройства на 5 единиц времени. | |
| Транзакт B выходит из модели в момент времени 7. | |
| Транзакт А выходит из модели в момент времени 9. | |
| Обработка транзакта А была прервана с 2 по 7. | |
| """ | |
| logger.info("--- test_preempt_return_002 ----------------------------------") | |
| ### MODEL ---------------------------------- | |
| m = PyssModel() | |
| sgm = Segment(m) | |
| # | |
| m[OPTIONS].setAllFalse() | |
| MAX_TIME = 20 | |
| # CONSTS | |
| TRANSACT_A = "A" | |
| TRANSACT_B = "B" | |
| # | |
| list_all_transact = [] | |
| tA = [] | |
| tB = [] | |
| # | |
| F_1 = "F_1" | |
| # ОКУ | |
| facility_1 = Facility(m, facilityName=F_1) | |
| # | |
| def funcTransactTo_list_all_transact(owner, transact): | |
| # складируем транзакты в список | |
| list_all_transact.append(transact) | |
| def setTransactLabel(owner, transact): | |
| if transact[NUM] == 1: | |
| transact[LABEL] = TRANSACT_A | |
| tA.append(transact) | |
| elif transact[NUM] == 2: | |
| transact[LABEL] = TRANSACT_B | |
| tB.append(transact) | |
| # функция проверки условия | |
| def checkTest(o): | |
| t=m.getCurrentTransact() | |
| if t[LABEL] == TRANSACT_B: | |
| return False | |
| return True | |
| def printAllTransact(owner, transact): | |
| print "Time=%s" % str(m.getCurTime()) | |
| print "\n".join([str(t) for t in list_all_transact]) | |
| print "tA=%s" % str(tA[0]) | |
| print "tB=%s" % str(tB[0]) | |
| ### SEGMENT ---------------------------- | |
| # формируется одна заявка в момент времени 1 | |
| Generate(sgm, | |
| med_value=1, | |
| modificatorFunc=None, | |
| first_tx=1, | |
| max_amount=2) | |
| # вспомогательные операции | |
| Handle(sgm, handlerFunc=funcTransactTo_list_all_transact) | |
| Handle(sgm, handlerFunc=setTransactLabel) | |
| # test | |
| Handle(sgm, handlerFunc=lambda o, t:self.assertNotIn(F_1, t[FACILITY])) | |
| # | |
| # первый транзакт проходит, второй направляется к метке "to_preempt" | |
| Test(sgm, funcCondition=checkTest, move2block="to_preempt") | |
| # только первый транзакт | |
| Seize(sgm, facilityName=F_1) | |
| # test | |
| Handle(sgm, handlerFunc=lambda o, t:self.assertIn(F_1, t[FACILITY])) | |
| # | |
| Advance(sgm, meanTime=3, modificatorFunc=None) | |
| Release(sgm, facilityName=F_1) | |
| # test | |
| Handle(sgm, handlerFunc=lambda o, t:self.assertNotIn(F_1, t[FACILITY])) | |
| # | |
| Transfer(sgm, funcTransfer=lambda o, t: o.findBlockByLabel("to_term")) | |
| #--- | |
| # только второй транзакт | |
| Preempt(sgm, facilityName=F_1, label="to_preempt") | |
| # test | |
| # .addBlock(handle.Handle(handlerFunc=lambda o,t:self.assertEqual(tA[0][REMAIND_TIME], None))) | |
| Handle(sgm, handlerFunc=printAllTransact) | |
| Handle(sgm, handlerFunc=lambda o, t:self.assertIn(F_1, t[FACILITY])) | |
| # | |
| Handle(sgm, handlerFunc=printAllTransact) | |
| Advance(sgm, meanTime=5, modificatorFunc=None) | |
| GReturn(sgm, facilityName=F_1) | |
| # test | |
| Handle(sgm, handlerFunc=lambda o, t:self.assertEqual(tA[0][REMAIND_TIME], 2)) | |
| Handle(sgm, handlerFunc=lambda o, t:self.assertEqual(tA[0][SCHEDULED_TIME], 9)) | |
| Handle(sgm, handlerFunc=lambda o, t:self.assertNotIn(F_1, t[FACILITY])) | |
| # | |
| Handle(sgm, handlerFunc=printAllTransact) | |
| # все транзакты | |
| Terminate(sgm, label="to_term", deltaTerminate=0) | |
| # ЗАПУСК ---------------------- | |
| m.start(terminationCount=MAX_TIME, maxTime=MAX_TIME) | |
| # ТЕСТЫ ---------------------- | |
| for t in list_all_transact: | |
| # Формируется транзакт A в момент времени 1. | |
| # Идёт на обработку устройством F_1 в течение 3 единиц времени. | |
| # Формируется транзакт B в момент времени 2. | |
| # Прерывает работу устройства на 5 единиц времени. | |
| # Транзакт B выходит из модели в момент времени 7. | |
| # Транзакт А выходит из модели в момент времени 9. | |
| # Обработка транзакта А была прервана с 2 по 7. | |
| print str(["%s:%s" % (k, t[k]) | |
| for k in t.keys() if k | |
| in [TIME_CREATED, TERMINATED_TIME, LIFE_TIME_LIST]]) | |
| if t[LABEL] == TRANSACT_A: | |
| self.assertEqual(t[TIME_CREATED], 1) | |
| self.assertEqual(t[REMAIND_TIME], 2) | |
| self.assertEqual(t[TERMINATED_TIME], 9) | |
| self.assertListEqual(t[LIFE_TIME_LIST], [ | |
| {'start': 1, 'state': 'actived'}, | |
| {'start': 2, 'state': 'preempted'}, | |
| {'start': 7, 'state': 'actived'}, | |
| {'start': 9, 'state': 'deleted'}]) | |
| elif t[LABEL] == TRANSACT_B: | |
| self.assertEqual(t[TIME_CREATED], 2) | |
| self.assertEqual(t[TERMINATED_TIME], 7) | |
| self.assertListEqual(t[LIFE_TIME_LIST], [ | |
| {'start': 2, 'state': 'actived'}, | |
| {'start': 7, 'state': 'deleted'}]) | |
| if __name__ == '__main__': | |
| unittest.main(module="test_preempt_return") | |