202 lines
5.9 KiB
Python
202 lines
5.9 KiB
Python
'''
|
|
Created on 24.06.2014
|
|
|
|
@author: volkma_s
|
|
'''
|
|
import os, sys
|
|
import urllib
|
|
sys.path.append(os.path.join(os.path.dirname(__file__), "./../"))
|
|
|
|
#import daemon
|
|
|
|
from http.server import BaseHTTPRequestHandler, HTTPServer
|
|
from socketserver import ThreadingMixIn
|
|
import time
|
|
from urllib.parse import urlparse
|
|
from utils.web import handle_file_request, get_my_ip
|
|
from mc_control.mc_statemachine import mc_state_machine, mc_event
|
|
from gpiozero import LED
|
|
from gpiozero.pins.pigpio import PiGPIOFactory
|
|
|
|
|
|
HOST_NAME = '' #'localhost'
|
|
PORT_NUMBER = 8000
|
|
|
|
def HandleQeury(query):
|
|
|
|
global sm
|
|
|
|
print("HandleQuery query={}".format(query))
|
|
query_components = urllib.parse.parse_qs(query)
|
|
|
|
port = int(query_components['port'][0])
|
|
action = query_components['action'][0]
|
|
|
|
if action == 'up':
|
|
print("UP Request")
|
|
sm[port].MailBox.put(mc_event.Up)
|
|
elif action == 'stop':
|
|
print("Stop Request")
|
|
sm[port].MailBox.put(mc_event.Stop)
|
|
elif action == 'down':
|
|
print("Down Request")
|
|
sm[port].MailBox.put(mc_event.Down)
|
|
else:
|
|
print("Unknown query: {}".format(query))
|
|
|
|
def GetState():
|
|
global sm
|
|
state = "{}".format(sm[0].position.position)
|
|
return state
|
|
|
|
def GetStates():
|
|
global sm
|
|
state = "{},{},{},{}".format( sm[0].position.position,
|
|
sm[1].position.position,
|
|
sm[2].position.position,
|
|
sm[3].position.position )
|
|
return state
|
|
|
|
class myhandler(BaseHTTPRequestHandler):
|
|
|
|
def do_HEAD(self):
|
|
self.send_response(200)
|
|
self.send_header('Content-type','text/html')
|
|
self.end_headers()
|
|
|
|
def do_GET(self):
|
|
|
|
parsed_path = urlparse(self.path)
|
|
path = parsed_path[2]
|
|
query = parsed_path[4]
|
|
|
|
if(query):
|
|
HandleQeury(query)
|
|
|
|
print("do_GET path=" + path)
|
|
|
|
if(path.endswith('.html')):
|
|
self.send_response(200)
|
|
self.send_header('Content-type','text/html')
|
|
self.end_headers()
|
|
data = handle_file_request(path,'utf-8')
|
|
elif(path.endswith('.css')):
|
|
self.send_response(200)
|
|
self.send_header('Content-type','text/css')
|
|
self.end_headers()
|
|
data = handle_file_request(path,'utf-8')
|
|
elif(path.endswith('.js')):
|
|
self.send_response(200)
|
|
self.send_header('Content-type','text/javascript')
|
|
self.end_headers()
|
|
data = handle_file_request(path,'utf-8')
|
|
elif(path.endswith('cmd')):
|
|
self.send_response(200)
|
|
self.send_header('Content-type','text/html')
|
|
self.end_headers()
|
|
out = GetStates()
|
|
data = out.encode('utf-8')
|
|
else:
|
|
self.send_response(200)
|
|
self.send_header('Content-type','text/html')
|
|
self.end_headers()
|
|
data = handle_file_request('index.html','utf-8')
|
|
|
|
self.wfile.write(data)
|
|
return
|
|
|
|
def do_POST(self):
|
|
|
|
self.send_response(200)
|
|
self.send_header('Content-type','text/html')
|
|
self.end_headers()
|
|
|
|
parsed_path = urlparse(self.path)
|
|
path = parsed_path[2]
|
|
query = parsed_path[4]
|
|
if(query):
|
|
HandleQeury(query)
|
|
|
|
class ThreadedHTTPServer(ThreadingMixIn, HTTPServer):
|
|
''' '''
|
|
|
|
def StartStateMachines(factory):
|
|
global sm
|
|
sm = list()
|
|
|
|
up_1 = LED(16, pin_factory=factory)
|
|
down_1 = LED(12, pin_factory=factory)
|
|
up_2 = LED(6, pin_factory=factory)
|
|
down_2 = LED(20, pin_factory=factory)
|
|
up_3 = LED(13, pin_factory=factory)
|
|
down_3 = LED(21, pin_factory=factory)
|
|
up_4 = LED(19, pin_factory=factory)
|
|
down_4 = LED(26, pin_factory=factory)
|
|
|
|
sm.append( mc_state_machine(up_1, down_1))
|
|
sm.append( mc_state_machine(up_2, down_2))
|
|
sm.append( mc_state_machine(up_3, down_3))
|
|
sm.append( mc_state_machine(up_4, down_4))
|
|
|
|
def CloseStateMachines():
|
|
global sm
|
|
sm[0].Stop()
|
|
sm[1].Stop()
|
|
sm[2].Stop()
|
|
sm[3].Stop()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
global sm
|
|
|
|
factory = PiGPIOFactory(host='jalousiecontrol')
|
|
StartStateMachines(factory)
|
|
|
|
httpd = ThreadedHTTPServer((HOST_NAME,PORT_NUMBER),myhandler)
|
|
|
|
print('{} - http Server started - http://{}:{}'.format(time.asctime(), get_my_ip(), PORT_NUMBER))
|
|
|
|
try:
|
|
httpd.serve_forever()
|
|
except KeyboardInterrupt:
|
|
print("end")
|
|
pass
|
|
|
|
CloseStateMachines()
|
|
|
|
print('{}http Server stopped '.format(time.asctime()))
|
|
|
|
|
|
import unittest
|
|
|
|
class WebServer_UnitTest(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
StartStateMachines()
|
|
|
|
def tearDown(self):
|
|
CloseStateMachines()
|
|
|
|
def test_HandleQuery(self):
|
|
# state changed from idle over move down to down
|
|
HandleQeury('action=up&port=0')
|
|
HandleQeury('action=down&port=0')
|
|
HandleQeury('action=stop&port=0')
|
|
|
|
HandleQeury('action=up&port=1')
|
|
HandleQeury('action=down&port=1')
|
|
HandleQeury('action=stop&port=1')
|
|
|
|
HandleQeury('action=up&port=2')
|
|
HandleQeury('action=down&port=2')
|
|
HandleQeury('action=stop&port=2')
|
|
|
|
HandleQeury('action=up&port=3')
|
|
HandleQeury('action=down&port=3')
|
|
HandleQeury('action=stop&port=3')
|
|
|
|
pass
|
|
|
|
|
|
|