Загрузить файлы в «aid03»

This commit is contained in:
2026-05-25 16:45:21 +03:00
commit 94a994511d
3 changed files with 303 additions and 0 deletions

271
aid03/app.py Normal file
View File

@@ -0,0 +1,271 @@
from flask import Flask, Response, request, redirect
from os import path, system, remove
import datetime
import numpy as np
from time import sleep, time
from subprocess import Popen, PIPE
ap=path.abspath(__file__)
script_dir = path.dirname(ap)
tlfList = "%s/data/tlf.list" % script_dir
app = Flask(__name__)
import telebot
tbToken='7764038785:AAFNwMoOZWhqH9hQxDJsoWqSS3BYL4P9ua4'
bot = telebot.TeleBot(tbToken)
homeToken='5563613923:AAFGYdokQYJfTTQYhJftGZy3KtMDSZg5p6Q';
homebot = telebot.TeleBot(homeToken)
@app.route('/')
def root_dir():
return('Hello!')
@app.route('/ok')
def say_ok():
return('Ok')
@app.route('/gettlf', methods = ['POST', 'GET'])
def get_tlf():
tlfline=""
def check_tlf(tnum):
with open(tlfList, 'r') as f:
lines = [line.rstrip() for line in f]
for l in lines:
b = l.split("\t")
if tnum == b[0]:
tlfline = l
return (True, l)
return (False, "nonum")
opendoor = True
rez = 'Ok'
ts = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
tlf = request.args.get('t')
if not tlf:
try:
tlf = request.form['t']
if not tlf:
tlf = '00000000000'
opendoor = False
rez = 'NT'
except Exception as e:
tlf = '00000000000'
opendoor = False
rez = 'ERR'
with open("./log/tlf.log", 'a') as f:
f.write("%s\t%s\n" % (ts, tlf))
if(opendoor):
chktlf, tlfline = check_tlf(tlf)
if chktlf:
system("curl -X GET http://192.168.50.63/openDoor")
bot.send_message('-1002333671072', "%s - открывание ворот" % tlfline)
else:
rez = 'NN'
return(rez)
@app.route('/doorchk')
def sb_doorchk():
doorid = request.args.get('doorid')
doorstat = request.args.get('doorstat')
vbat = request.args.get('vbat')
temp = request.args.get('t')
hum = request.args.get('h')
msgadd = ""
if not doorid:
return 'ERROR! No door number\n'
if not doorstat:
return 'ERROR! No door status\n'
if not vbat:
vbat = "-1"
#//else:
#// if(float(vbat) < vbat_low):
#// tg_alert("DOOR CHECK [%s] VBAT LOW %s" % (doorid, vbat))
if temp:
msgadd = " / Температура %.2fV" % float(temp)
if hum:
msgadd = "%s / Влажность %.2fV" % (msgadd, float(hum))
msgadd = ""
ts = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
if(int(doorstat)==0):
msg = "Калитка %s. Открытие. %s%s\n" % (doorid, ts, msgadd)
else:
msg = "Калитка %s. Закрытие. %s%s\n" % (doorid, ts, msgadd)
uid = "245058979"
#homebot.send_message(uid, msg)
bot.send_message('-1002333671072', msg)
return 'Сообщение отправлено\n'
@app.route('/hm')
def heart_mon():
data = request.args.get('data')
answ = 'OK'
if not data:
data = 'data empty'
answ = 'ERR'
ts = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
with open("./log/heart_mon.stat", 'a') as f:
f.write("%s\t%s\n" % (ts, data))
p=data.split(",")
tm=int(time())
if len(p) >=3 :
ma=p[2].replace(':','')
#cmd="mosquitto_pub -u \"shurik\" -P \"Iehbr2023\" -h 127.0.0.1 -p 1883 -t \"homeassistant/sensor/sugar001/state\" -m '{\"MAC\":\"%s\",\"mmol\":%s,\"temp\":%s,\"time\":%i}'" % (p[2
if len(p) >=5 :
cmd="mosquitto_pub -r -u \"shurik\" -P \"Iehbr2023\" -h 127.0.0.1 -p 1883 -t \"sugarcontrol/sensor/sugar/%s/state\" -m '{\"MAC\":\"%s\",\"mmol\":%s,\"temp\":%s,\"time\":%i,\"sensor\":\"%s\"}'" % (p[4],p[2],p[0], p[1],int(p[3])/1000,p[4])
#cmd="mosquitto_pub -u \"shurik\" -P \"Iehbr2023\" -h 127.0.0.1 -p 1883 -t \"homeassistant/sensor/sugar/%s/state\" -m '{\"MAC\":\"%s\",\"mmol\":%s,\"temp\":%s,\"time\":%i}'" %
else :
cmd="mosquitto_pub -r -u \"shurik\" -P \"Iehbr2023\" -h 127.0.0.1 -p 1883 -t \"homeassistant/sensor/sugar/%s/state\" -m '{\"MAC\":\"%s\",\"mmol\":%s,\"temp\":%s,\"time\":%i}'" % (ma,p[2],p[0], p[1],tm)
print(cmd)
system(cmd)
else:
print("test data?")
return answ
@app.route('/hmShow')
def hm_show():
llimit = 10
log_file = "./log/heart_mon.stat"
new_log = "%s.tmp" % (log_file)
data = ''
try:
if llimit :
system("tail -n%s %s > %s" % (llimit, log_file, new_log))
log_file = new_log
with open(log_file, 'r') as f:
data = f.read()
except Exception as e:
data = "data log is empty\n"
return Response(data, mimetype='text/plain')
import psycopg2
DBName='tgbot' #'lifecontrol'
DBUser='tgbot' #'lifecontrol'
DBPass=''
DBPort='6432'
#DBHost='213.252.97.166'
DBHost='185.212.88.27'
def write2log(logfile, msg):
with open(logfile, "a") as f:
ts = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
f.write("%s: %s\n" % (ts, msg))
@app.route('/drl')
def get_drug_list():
dataval = request.args.get('drp')
if not dataval:
return Response("не задан шаблон для поиска лекарств", mimetype='text/plain')
qs=str('%'+dataval+'%')
q="SELECT name_rus FROM drugs WHERE name_rus LIKE '%s'" % qs.upper()
write2log('./log/get_drug_list.log', q)
conn = psycopg2.connect(dbname=DBName, user=DBUser, password=DBPass, host=DBHost, port=DBPort)
cursor = conn.cursor()
cursor.execute(q)
records = cursor.fetchall()
data='По заданному шаблону лекарства не найдены'
i=1
for r in records:
if(i == 1):
data=r[0]
else:
data=data+"\n"+r[0]
i=i+1
cursor.close()
conn.close()
#data="АСПИРИН\nПЕНТАЛГИН\nПУРГЕН"
return Response(data, mimetype='text/plain')
@app.route('/pox', methods = ['POST']) # pulseoximeter
def pox():
try:
if(not request.headers):
return('ERR headers\n')
if(not request.headers["Title"]):
return('ERR Title\n')
i = request.headers["Title"].find(",")
ap_id = request.headers["Title"][:i]
ts = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
rdat = request.data
#==================================================================
print("ld2410dat: data check")
with open("./log/pox_dat.%s" % ap_id, 'a') as f:
f.write("%s\t%s\t%s\n" % (ts, request.headers["Title"], rdat))
except Exception as e:
print(e)
return("sba work error")
return 'OK!\n'
@app.route('/getmr', methods = ['GET'])
def get_mr60():
datadir = "log"
lcnt=20
mrid = request.args.get('id')
if not mrid:
mrid = "01"
log = "%s/%s/pox_dat.R60_8266_%s" % (script_dir, datadir, mrid)
if not path.isfile(log):
return Response("no log file", mimetype='text/plain')
buf = "time,\t\t\tpulse,\tdyh-e,\tdist,\tmove\n"
databuf = Popen(["tail", "-n%i" % lcnt, log], stdout=PIPE, encoding='utf-8').communicate()[0]
lines = databuf.split("\n")
for line in lines:
p = line.split("\t")
if len(p) >=3 :
tdat = p[1].split(", ")
ttime = p[0].split(".")[0]
buf = buf + "%s\t%s\t%s\t%s\t%s\n" % (ttime, tdat[1], tdat[3], tdat[5], tdat[7])
return Response(buf, mimetype='text/plain')
@app.route('/mr60dat')
def mr60data():
datadir = "log"
lcnt=1
mrid = request.args.get('id')
if not mrid:
log = "%s/%s/pox_dat.R60_GERMAN" % (script_dir, datadir)
else:
log = "%s/%s/pox_dat.R60_8266_%s" % (script_dir, datadir, mrid)
if not path.isfile(log):
return Response("no log file %s" % log, mimetype='text/plain')
line = Popen(["tail", "-n%i" % lcnt, log], stdout=PIPE, encoding='utf-8').communicate()[0]
wa_file = "%s/templates/R60.html" % script_dir
data = ''
try:
with open(wa_file, 'r') as f:
data = f.read()
except Exception as e:
data = "data file is empty %s\n" % wa_file
p = line.split("\t")
#print("=================================================")
#print(len(p))
if len(p) >=3 :
tdat = p[1].split(", ")
#print(tdat)
ttime = p[0].split(".")[0]
data=data.replace("<%time%>", ttime).replace("<%heart%>", tdat[1]).replace("<%breath%>", tdat[3]).replace("<%dist%>", tdat[5]).replace("<%move%>", tdat[7])
#print("=================================================")
return Response(data, mimetype='text/html')
@app.route('/tgwebapp')
def tg_webapp():
wa_file = "./templates/webapp.html"
data = ''
try:
with open(wa_file, 'r') as f:
data = f.read()
except Exception as e:
data = "webapp file is empty\n"
return Response(data, mimetype='text/html')
@app.route('/tgpsybot')
def tg_psybot():
#rurl = "https://t.me/aipsyassistantbot"
rurl = "https://www.vysotamed.ru/"
return redirect(rurl)
if __name__ == '__main__':
app.run()