Sky/server.py

212 lines
6.8 KiB
Python

#!/usr/bin/env python3
"Sky - A simple weather monitor"
import re
import sqlite3
from datetime import datetime
from textwrap import wrap
import darksky
from flask import Flask
from mapbox import Geocoder
from pytz import timezone
import config
from columnizer import Columnizer
from ascii import arts
GEOCODER = Geocoder(access_token=config.mapbox_key)
APP = Flask(__name__)
CACHE_DB = sqlite3.connect("cache.sqlite", check_same_thread=False)
def get_coordinates(location, geocoder, cache_db):
"Returns the coordinates for a given location"
row = cache_db.execute("select lat,lon,full_name from location where "
"name = ?", (location,)).fetchone()
if row is not None:
return (row[0], row[1], row[2])
loc = geocoder.forward(location)
coordinates = loc.geojson()['features'][0]['center']
name = loc.geojson()['features'][0]['place_name']
cache_db.execute("insert into location(name, lon, lat, full_name) values("
"?,?,?,?)", (location, *coordinates, name))
return (coordinates[1], coordinates[0], name)
def get_weather(coordinates):
"Returns the forecast from the coordinates"
return darksky.forecast(config.dark_sky_key, coordinates[0],
coordinates[1])
def summary_to_c(summary):
"Converts Fahrenheits to Celsius in a string"
# pylint: disable=invalid-name
for i in re.findall(r"[\d.]+°F", summary):
f = float(i[:-2])
c = 5 * (f - 32)/9
summary = re.sub(i, f"{c:3.3}°C", summary)
return summary
def weather_to_text(forecast, kind, unit, name):
"Converts the forecast to text"
text = name + "\n"
if unit == "c":
text += summary_to_c(forecast[kind]["summary"]) + "\n\n"
else:
text += forecast[kind]["summary"] + "\n\n"
row_0 = Columnizer()
row_1 = Columnizer()
# Add today
today = [
"Current",
"",
*arts[forecast["currently"]["icon"]],
""
]
if unit == "c":
today += ["{0:.1f}°C".format(
5*(forecast["currently"]["temperature"]-32)/9)]
else:
today += [str(forecast["currently"]["temperature"]) + "°F"]
today += [""]
if kind == "daily":
today += [""]
today += wrap(forecast["currently"]["summary"], 18)
row_0.add_column(today)
count = 1
if kind == "daily":
diff = 1
elif kind == "hourly":
diff = 3
for j in range(0, len(forecast[kind]["data"]), diff):
i = forecast[kind]["data"][j]
x = []
date = datetime.fromtimestamp(i["time"],
timezone(forecast["timezone"]))
if kind == "daily":
x += date.strftime("%d %B\n%A").split("\n")
elif kind == "hourly":
x += [date.strftime("%H:%M"), ""]
x += [*arts[i["icon"]]]
x += [""]
if unit == "c":
if kind == "daily":
x += ["Max: {0:.1f}°C".format(5*(i["temperatureMax"]-32)/9)]
x += ["Min: {0:.1f}°C".format(5*(i["temperatureMin"]-32)/9)]
elif kind == "hourly":
x += ["Temp: {0:.1f}°C".format(5*(i["temperature"]-32)/9)]
else:
if kind == "daily":
x += ["Max: {0:.1f}°F".format(i["temperatureMax"])]
x += ["Min: {0:.1f}°F".format(i["temperatureMin"])]
elif kind == "hourly":
x += ["Temp: {0:.1f}°F".format(i["temperature"])]
x += [""]
x += wrap(i["summary"], 18)
if count < 4:
row_0.add_column(x)
elif count < 8:
row_1.add_column(x)
else:
break
count += 1
for i in forecast[kind]["data"]:
x = []
date = datetime.fromtimestamp(i["time"],
timezone(forecast["timezone"]))
x += date.strftime("%d %B\n%A").split("\n")
x += [*arts[i["icon"]]]
x += [""]
if unit == "c":
if kind == "daily":
x += ["Max: {0:.1f}°C".format(5*(i["temperatureMax"]-32)/9)]
x += ["Min: {0:.1f}°C".format(5*(i["temperatureMin"]-32)/9)]
elif kind == "hourly":
x += ["Temp: {0:.1f}°C".format(5*(i["temperature"]-32)/9)]
else:
if kind == "daily":
x += ["Max: {0:.1f}°F".format(i["temperatureMax"])]
x += ["Min: {0:.1f}°F".format(i["temperatureMin"])]
elif kind == "hourly":
x += ["Temp: {0:.1f}°F".format(i["temperature"])]
# x += [i["summary"]]
if count < 4:
row_0.add_column(x)
elif count < 8:
row_1.add_column(x)
else:
break
count += 1
text += str(row_0) + "\n" + str(row_1)
return text
@APP.route('/')
def index():
"/ - Gives the default index page"
text = ("\nSky - A simple weather monitor\n\n"
"Usage\n\n"
"To check weather for your location simply run the following "
"command in a terminal\n\n"
"In Celsius\n"
"+ Weather for one week - curl sky.webionite.com/location \n"
"+ Weather for today - curl sky.webionite.com/location/t\n\n"
"In Fahrenheit\n"
"+ Weather for one week - curl sky.webionite.com/f/location \n"
"+ Weather for today - curl sky.webionite.com/f/location/t\n\n"
"Replace sky.webionite.com/ with sky.webionite.com/p/ on "
"Windows \n\n"
+ config.source + "\n")
return text
def main(location, geocoder, cache_db, kind, unit):
"Main function that returns the weather based on parameters"
try:
coordinates = get_coordinates(location, geocoder, cache_db)
except (IndexError, KeyError):
return "Location Not found"
weather = get_weather(coordinates)
text = weather_to_text(weather, kind, unit, coordinates[2])
return ("\n" + text + "\n\n" + config.source + "\nPowered by Dark Sky - "
"https://darksky.net/poweredby/\n")
def strip_colors(text):
"Strips ANSI colors from a string"
return re.sub('\033\\[[0-9;]+?m', '', text)
@APP.route('/<path:path>')
def url_parser(path):
"Parses the URL"
location = ''
unit = 'c'
plain = False
frequency = 'daily'
for part in path.strip('/').split('/'):
if part == 'f':
unit = 'f'
elif part == 'p':
plain = True
elif part == 't':
frequency = 'hourly'
else:
location = part
if plain:
return strip_colors(main(location, GEOCODER, CACHE_DB, frequency,
unit))
return main(location, GEOCODER, CACHE_DB, frequency, unit)
@APP.after_request
def after(response):
"Sets content type to text/plain"
response.headers['Content-Type'] = "text/plain; charset=utf-8"
return response