123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202 |
- '''
- Created on 24.06.2014
-
- @author: volkma_s
- '''
- import os, sys
- import urllib
- sys.path.append(os.path.join(os.path.dirname(__file__), "./../"))
-
- #import daemon
-
- from http.server import BaseHTTPRequestHandler, HTTPServer
- from socketserver import ThreadingMixIn
- import time
- from urllib.parse import urlparse
- from utils.web import handle_file_request, get_my_ip
- from mc_control.mc_statemachine import mc_state_machine, mc_event
- from gpiozero import LED
- from gpiozero.pins.pigpio import PiGPIOFactory
-
-
- HOST_NAME = '' #'localhost'
- PORT_NUMBER = 8000
-
- def HandleQeury(query):
-
- global sm
-
- print("HandleQuery query={}".format(query))
- query_components = urllib.parse.parse_qs(query)
-
- port = int(query_components['port'][0])
- action = query_components['action'][0]
-
- if action == 'up':
- print("UP Request")
- sm[port].MailBox.put(mc_event.Up)
- elif action == 'stop':
- print("Stop Request")
- sm[port].MailBox.put(mc_event.Stop)
- elif action == 'down':
- print("Down Request")
- sm[port].MailBox.put(mc_event.Down)
- else:
- print("Unknown query: {}".format(query))
-
- def GetState():
- global sm
- state = "{}".format(sm[0].position.position)
- return state
-
- def GetStates():
- global sm
- state = "{},{},{},{}".format( sm[0].position.position,
- sm[1].position.position,
- sm[2].position.position,
- sm[3].position.position )
- return state
-
- class myhandler(BaseHTTPRequestHandler):
-
- def do_HEAD(self):
- self.send_response(200)
- self.send_header('Content-type','text/html')
- self.end_headers()
-
- def do_GET(self):
-
- parsed_path = urlparse(self.path)
- path = parsed_path[2]
- query = parsed_path[4]
-
- if(query):
- HandleQeury(query)
-
- print("do_GET path=" + path)
-
- if(path.endswith('.html')):
- self.send_response(200)
- self.send_header('Content-type','text/html')
- self.end_headers()
- data = handle_file_request(path,'utf-8')
- elif(path.endswith('.css')):
- self.send_response(200)
- self.send_header('Content-type','text/css')
- self.end_headers()
- data = handle_file_request(path,'utf-8')
- elif(path.endswith('.js')):
- self.send_response(200)
- self.send_header('Content-type','text/javascript')
- self.end_headers()
- data = handle_file_request(path,'utf-8')
- elif(path.endswith('cmd')):
- self.send_response(200)
- self.send_header('Content-type','text/html')
- self.end_headers()
- out = GetStates()
- data = out.encode('utf-8')
- else:
- self.send_response(200)
- self.send_header('Content-type','text/html')
- self.end_headers()
- data = handle_file_request('index.html','utf-8')
-
- self.wfile.write(data)
- return
-
- def do_POST(self):
-
- self.send_response(200)
- self.send_header('Content-type','text/html')
- self.end_headers()
-
- parsed_path = urlparse(self.path)
- path = parsed_path[2]
- query = parsed_path[4]
- if(query):
- HandleQeury(query)
-
- class ThreadedHTTPServer(ThreadingMixIn, HTTPServer):
- ''' '''
-
- def StartStateMachines(factory):
- global sm
- sm = list()
-
- up_1 = LED(16, pin_factory=factory)
- down_1 = LED(12, pin_factory=factory)
- up_2 = LED(6, pin_factory=factory)
- down_2 = LED(20, pin_factory=factory)
- up_3 = LED(13, pin_factory=factory)
- down_3 = LED(21, pin_factory=factory)
- up_4 = LED(19, pin_factory=factory)
- down_4 = LED(26, pin_factory=factory)
-
- sm.append( mc_state_machine(up_1, down_1))
- sm.append( mc_state_machine(up_2, down_2))
- sm.append( mc_state_machine(up_3, down_3))
- sm.append( mc_state_machine(up_4, down_4))
-
- def CloseStateMachines():
- global sm
- sm[0].Stop()
- sm[1].Stop()
- sm[2].Stop()
- sm[3].Stop()
-
-
- if __name__ == '__main__':
-
- global sm
-
- factory = PiGPIOFactory(host='jalousiecontrol')
- StartStateMachines(factory)
-
- httpd = ThreadedHTTPServer((HOST_NAME,PORT_NUMBER),myhandler)
-
- print('{} - http Server started - http://{}:{}'.format(time.asctime(), get_my_ip(), PORT_NUMBER))
-
- try:
- httpd.serve_forever()
- except KeyboardInterrupt:
- print("end")
- pass
-
- CloseStateMachines()
-
- print('{}http Server stopped '.format(time.asctime()))
-
-
- import unittest
-
- class WebServer_UnitTest(unittest.TestCase):
-
- def setUp(self):
- StartStateMachines()
-
- def tearDown(self):
- CloseStateMachines()
-
- def test_HandleQuery(self):
- # state changed from idle over move down to down
- HandleQeury('action=up&port=0')
- HandleQeury('action=down&port=0')
- HandleQeury('action=stop&port=0')
-
- HandleQeury('action=up&port=1')
- HandleQeury('action=down&port=1')
- HandleQeury('action=stop&port=1')
-
- HandleQeury('action=up&port=2')
- HandleQeury('action=down&port=2')
- HandleQeury('action=stop&port=2')
-
- HandleQeury('action=up&port=3')
- HandleQeury('action=down&port=3')
- HandleQeury('action=stop&port=3')
-
- pass
-
-
-
|