202 lines
5.9 KiB
Python

'''
Created on 24.06.2014
@author: volkma_s
'''
import os, sys
import urllib
sys.path.append(os.path.join(os.path.dirname(__file__), "./../"))
#import daemon
from http.server import BaseHTTPRequestHandler, HTTPServer
from socketserver import ThreadingMixIn
import time
from urllib.parse import urlparse
from utils.web import handle_file_request, get_my_ip
from mc_control.mc_statemachine import mc_state_machine, mc_event
from gpiozero import LED
from gpiozero.pins.pigpio import PiGPIOFactory
HOST_NAME = '' #'localhost'
PORT_NUMBER = 8000
def HandleQeury(query):
global sm
print("HandleQuery query={}".format(query))
query_components = urllib.parse.parse_qs(query)
port = int(query_components['port'][0])
action = query_components['action'][0]
if action == 'up':
print("UP Request")
sm[port].MailBox.put(mc_event.Up)
elif action == 'stop':
print("Stop Request")
sm[port].MailBox.put(mc_event.Stop)
elif action == 'down':
print("Down Request")
sm[port].MailBox.put(mc_event.Down)
else:
print("Unknown query: {}".format(query))
def GetState():
global sm
state = "{}".format(sm[0].position.position)
return state
def GetStates():
global sm
state = "{},{},{},{}".format( sm[0].position.position,
sm[1].position.position,
sm[2].position.position,
sm[3].position.position )
return state
class myhandler(BaseHTTPRequestHandler):
def do_HEAD(self):
self.send_response(200)
self.send_header('Content-type','text/html')
self.end_headers()
def do_GET(self):
parsed_path = urlparse(self.path)
path = parsed_path[2]
query = parsed_path[4]
if(query):
HandleQeury(query)
print("do_GET path=" + path)
if(path.endswith('.html')):
self.send_response(200)
self.send_header('Content-type','text/html')
self.end_headers()
data = handle_file_request(path,'utf-8')
elif(path.endswith('.css')):
self.send_response(200)
self.send_header('Content-type','text/css')
self.end_headers()
data = handle_file_request(path,'utf-8')
elif(path.endswith('.js')):
self.send_response(200)
self.send_header('Content-type','text/javascript')
self.end_headers()
data = handle_file_request(path,'utf-8')
elif(path.endswith('cmd')):
self.send_response(200)
self.send_header('Content-type','text/html')
self.end_headers()
out = GetStates()
data = out.encode('utf-8')
else:
self.send_response(200)
self.send_header('Content-type','text/html')
self.end_headers()
data = handle_file_request('index.html','utf-8')
self.wfile.write(data)
return
def do_POST(self):
self.send_response(200)
self.send_header('Content-type','text/html')
self.end_headers()
parsed_path = urlparse(self.path)
path = parsed_path[2]
query = parsed_path[4]
if(query):
HandleQeury(query)
class ThreadedHTTPServer(ThreadingMixIn, HTTPServer):
''' '''
def StartStateMachines(factory):
global sm
sm = list()
up_1 = LED(16, pin_factory=factory)
down_1 = LED(12, pin_factory=factory)
up_2 = LED(6, pin_factory=factory)
down_2 = LED(20, pin_factory=factory)
up_3 = LED(13, pin_factory=factory)
down_3 = LED(21, pin_factory=factory)
up_4 = LED(19, pin_factory=factory)
down_4 = LED(26, pin_factory=factory)
sm.append( mc_state_machine(up_1, down_1))
sm.append( mc_state_machine(up_2, down_2))
sm.append( mc_state_machine(up_3, down_3))
sm.append( mc_state_machine(up_4, down_4))
def CloseStateMachines():
global sm
sm[0].Stop()
sm[1].Stop()
sm[2].Stop()
sm[3].Stop()
if __name__ == '__main__':
global sm
factory = PiGPIOFactory(host='jalousiecontrol')
StartStateMachines(factory)
httpd = ThreadedHTTPServer((HOST_NAME,PORT_NUMBER),myhandler)
print('{} - http Server started - http://{}:{}'.format(time.asctime(), get_my_ip(), PORT_NUMBER))
try:
httpd.serve_forever()
except KeyboardInterrupt:
print("end")
pass
CloseStateMachines()
print('{}http Server stopped '.format(time.asctime()))
import unittest
class WebServer_UnitTest(unittest.TestCase):
def setUp(self):
StartStateMachines()
def tearDown(self):
CloseStateMachines()
def test_HandleQuery(self):
# state changed from idle over move down to down
HandleQeury('action=up&port=0')
HandleQeury('action=down&port=0')
HandleQeury('action=stop&port=0')
HandleQeury('action=up&port=1')
HandleQeury('action=down&port=1')
HandleQeury('action=stop&port=1')
HandleQeury('action=up&port=2')
HandleQeury('action=down&port=2')
HandleQeury('action=stop&port=2')
HandleQeury('action=up&port=3')
HandleQeury('action=down&port=3')
HandleQeury('action=stop&port=3')
pass