需要将现有程序改为定时每小时的0分钟,15分钟,30分钟,45分钟自动启动通过接口读取数据后写入数据库,然后结束该程序,直到到达下个定时时间。(例:每天00:00该程序自动启动读取数据写入数据库后自动结束直到00:15,然后自动启动读取数据写入数据库后自动结束直到00:30,重复到00:45,01:00,01:15,01:30等)
# -*- coding: utf-8 -*-
import json
import time
from requests import post
from datetime import datetime, timedelta
import pymssql
import decimal
from logging import basicConfig,debug,DEBUG
import os
##################
#获取流量数据开始#
##################
baseUrl = '*********,#接口用*保密
header = {'Content-Type':'application/json'}
url = f'http://{baseUrl}/api/water/reservoidFlowOutHour/1.0' #5分钟出库流量
# 'WSPDSLLS00003702'
# 'WSPDPLLS00003702'
def data_request(url,header,body,key,station):
try:
r = post(url, headers = header , json = body)
datas = r.json()
# print(datas)
data_lst = []
if 'data' in datas:
data_lst = []
for i in range(len(datas['data'])):
# print(datetime.now())
if datas['data'][i][key] == station:
data_lst.append(datas['data'][i])
return data_lst
else:
print('返回值中无data键:',datas)
debug('返回值中无data键:'+str(datas))
return []
except OSError:
print('OSError esb 连接失败!!')
debug('OSError esb 连接失败!!')
return []
except Exception as e:
print('数据格式有误,post返回值为:'+str(datas)+' ,错误信息为:',e)
debug('数据格式有误,post返回值为:'+str(datas)+' ,错误信息为:'+str(e))
return []
def time_trans(unix):
'''
将unix时间转换成北京时间
'''
if len(str(unix)) == 13: #有毫秒时间
return datetime.fromtimestamp(int(unix)/1000)
elif len(str(unix)) == 10:
return datetime.fromtimestamp(int(unix))
else:
print('unix时间长度有误!')
# pbg_lst = data_request(url,header,body,'POINT_CODE','WSPDPLLS00003702')
# print(pbg_lst,time_trans(pbg_lst[-1]['DATA_TIME']))
##################
#获取流量数据结束#
##################
################
#数据库连接开始#
################
sql_cfg = {
'url':'*********,#接口用*保密
'usr':'U_DB_IF',
'pwd':'DB_6789054321_!',
'db_name':'SkyUni_DB_Moni_DataInterFace',
} # SQL server 查询配置
def insert_value(tables,SCTD,TM,Q):
try:
conn = pymssql.connect(sql_cfg['url'],sql_cfg['usr'],sql_cfg['pwd'],sql_cfg['db_name'],tables)
cursor = conn.cursor()
insert_sql = f"INSERT INTO {tables} (STCD,TM,Q) VALUES ('{SCTD}','{TM}','{Q}')"
cursor.execute(insert_sql)
conn.commit()
cursor.close() #关闭游标
conn.close() #关闭数据库连接
print(tables,SCTD,TM,Q,'数据更新成功')
debug(str(tables)+' '+str(SCTD)+' '+str(TM)+' '+str(Q)+' '+'数据更新成功')
except pymssql.IntegrityError:
print(tables,SCTD,TM,Q,'数据已存在,无最新数据')
debug(str(tables)+' '+str(SCTD)+' '+str(TM)+' '+str(Q)+' '+'数据已存在,无最新数据')
except Exception as e:
print('数据库连接失败!失败原因:',e)
debug('数据库连接失败!失败原因:'+str(e))
# insert_value('ST_RIVER_R_PBG','511823SWSL084',datetime.fromtimestamp(1566835200000/1000),2205.0)
################
#数据库连接结束#
################
def main():
try:
body = {"startTime":str(datetime.now())[:10]+' 00:00:00',"endTime":str(datetime.now())[:19]} #时间从今天0点到现在
print(body)
pbg_lst = data_request(url,header,body,'POINT_CODE','WSPDPLLS00003702')
if pbg_lst:
pbg_data = pbg_lst[-1] #取今天最新的一条数据
insert_value('ST_RIVER_R_PBG','511823SWSL084',datetime.strptime(str(datetime.now()), "%Y-%m-%d %H:%M:%S.%f").strftime('%Y-%m-%d %H:%M:%S'),pbg_data['DATA_VALUE'])
else:
print('数据获取失败,esb返回值为空')
debug('数据获取失败,esb返回值为空')
sxg_lst = data_request(url,header,body,'POINT_CODE','WSPDSLLS00003702')
if sxg_lst:
sxg_data = sxg_lst[-1]
insert_value('ST_RIVER_R_PBG','511823SWSL085',datetime.strptime(str(datetime.now()), "%Y-%m-%d %H:%M:%S.%f").strftime('%Y-%m-%d %H:%M:%S'),sxg_data['DATA_VALUE'])
else:
print('数据获取失败,esb返回值为空')
debug('数据获取失败,esb返回值为空')
except Exception as e:
print('数据更新失败!失败原因:',e)
debug('数据更新失败!失败原因:'+str(e))
# try:
# time_now = datetime.now()
# print('\n','#####################################','\n',
# '#本次更新时间:',str(time_now)[:19],'#','\n',
# '#####################################','\n')
# main()
# print('\n','################################################','\n',
# '#本次更新结束,请勿关闭程序!等待下一次更新...','#','\n',
# '################################################','\n')
# except Exception as e:
# debug('主程序运行失败!失败原因:'+str(e))
if __name__ == '__main__':
while True:
try:
time_now = datetime.now()
print('\n','#####################################','\n',
'#本次更新时间:',str(time_now)[:19],'#','\n',
'#####################################','\n')
main()
print('\n','################################################','\n',
'#本次更新结束,请勿关闭程序!等待下一次更新...','#','\n',
'################################################','\n')
except:
continue
time.sleep(900)
题主你好
两个方法可以选择
1 写一个bat,调用这个程序,使用window自带的定时任务即可定时调用
2 再写一个定时程序来调用这个程序
不建议修改这个程序,因为程序代码执行要隔离,各干各的事才是最好的规划。
如果您还有什么疑问,可以继续交流
修改了程序如下:
# -*- coding: utf-8 -*-
import json
import time
from requests import post
from datetime import datetime, timedelta
import pymssql
import decimal
from logging import basicConfig, debug, DEBUG
import os
import sched
##################
# 获取流量数据开始#
##################
baseUrl = '*********,#接口用*保密
header = {'Content-Type': 'application/json'}
url = f'http://{baseUrl}/api/water/reservoidFlowOutHour/1.0' # 5分钟出库流量
# 'WSPDSLLS00003702'
# 'WSPDPLLS00003702'
def data_request(url, header, body, key, station):
try:
r = post(url, headers=header, json=body)
datas = r.json()
# print(datas)
data_lst = []
if 'data' in datas:
data_lst = []
for i in range(len(datas['data'])):
# print(datetime.now())
if datas['data'][i][key] == station:
data_lst.append(datas['data'][i])
return data_lst
else:
print('返回值中无data键:', datas)
debug('返回值中无data键:' + str(datas))
return []
except OSError:
print('OSError esb 连接失败!!')
debug('OSError esb 连接失败!!')
return []
except Exception as e:
print('数据格式有误,post返回值为:' + str(datas) + ' ,错误信息为:', e)
debug('数据格式有误,post返回值为:' + str(datas) + ' ,错误信息为:' + str(e))
return []
def time_trans(unix):
'''
将unix时间转换成北京时间
'''
if len(str(unix)) == 13: # 有毫秒时间
return datetime.fromtimestamp(int(unix) / 1000)
elif len(str(unix)) == 10:
return datetime.fromtimestamp(int(unix))
else:
print('unix时间长度有误!')
# pbg_lst = data_request(url,header,body,'POINT_CODE','WSPDPLLS00003702')
# print(pbg_lst,time_trans(pbg_lst[-1]['DATA_TIME']))
##################
# 获取流量数据结束#
##################
################
# 数据库连接开始#
################
sql_cfg = {
'url': '*********,#接口用*保密
'usr':'U_DB_IF',
'pwd': 'DB_6789054321_!',
'db_name': 'SkyUni_DB_Moni_DataInterFace',
} # SQL server 查询配置
def insert_value(tables, SCTD, TM, Q):
try:
conn = pymssql.connect(sql_cfg['url'], sql_cfg['usr'], sql_cfg['pwd'], sql_cfg['db_name'], tables)
cursor = conn.cursor()
insert_sql = f"INSERT INTO {tables} (STCD,TM,Q) VALUES ('{SCTD}','{TM}','{Q}')"
cursor.execute(insert_sql)
conn.commit()
cursor.close() # 关闭游标
conn.close() # 关闭数据库连接
print(tables, SCTD, TM, Q, '数据更新成功')
debug(str(tables) + ' ' + str(SCTD) + ' ' + str(TM) + ' ' + str(Q) + ' ' + '数据更新成功')
except pymssql.IntegrityError:
print(tables, SCTD, TM, Q, '数据已存在,无最新数据')
debug(str(tables) + ' ' + str(SCTD) + ' ' + str(TM) + ' ' + str(Q) + ' ' + '数据已存在,无最新数据')
except Exception as e:
print('数据库连接失败!失败原因:', e)
debug('数据库连接失败!失败原因:' + str(e))
# insert_value('ST_RIVER_R_PBG','511823SWSL084',datetime.fromtimestamp(1566835200000/1000),2205.0)
################
# 数据库连接结束#
################
def main():
try:
body = {"startTime": str(datetime.now())[:10] + ' 00:00:00', "endTime": str(datetime.now())[:19]} # 时间从今天0点到现在
print(body)
pbg_lst = data_request(url, header, body, 'POINT_CODE', 'WSPDPLLS00003702')
if pbg_lst:
pbg_data = pbg_lst[-1] # 取今天最新的一条数据
insert_value('ST_RIVER_R_PBG', '511823SWSL084',
datetime.strptime(str(datetime.now()), "%Y-%m-%d %H:%M:%S.%f").strftime('%Y-%m-%d %H:%M:%S'),
pbg_data['DATA_VALUE'])
else:
print('数据获取失败,esb返回值为空')
debug('数据获取失败,esb返回值为空')
sxg_lst = data_request(url, header, body, 'POINT_CODE', 'WSPDSLLS00003702')
if sxg_lst:
sxg_data = sxg_lst[-1]
insert_value('ST_RIVER_R_PBG', '511823SWSL085',
datetime.strptime(str(datetime.now()), "%Y-%m-%d %H:%M:%S.%f").strftime('%Y-%m-%d %H:%M:%S'),
sxg_data['DATA_VALUE'])
else:
print('数据获取失败,esb返回值为空')
debug('数据获取失败,esb返回值为空')
except Exception as e:
print('数据更新失败!失败原因:', e)
debug('数据更新失败!失败原因:' + str(e))
# try:
# time_now = datetime.now()
# print('\n','#####################################','\n',
# '#本次更新时间:',str(time_now)[:19],'#','\n',
# '#####################################','\n')
# main()
# print('\n','################################################','\n',
# '#本次更新结束,请勿关闭程序!等待下一次更新...','#','\n',
# '################################################','\n')
# except Exception as e:
# debug('主程序运行失败!失败原因:'+str(e))
schedule = sched.scheduler(time.time, time.sleep)
def run_main(inc):
time_now = datetime.now()
print('\n', '#####################################', '\n',
'#本次更新时间:', str(time_now)[:19], '#', '\n',
'#####################################', '\n')
main()
schedule.enter(inc, 0, run_main, (inc,))
print('\n', '################################################', '\n',
'#本次更新结束,请勿关闭程序!等待下一次更新...', '#', '\n',
'################################################', '\n')
def run(inc=60):
schedule.enter(0, 0, run_main, (inc,))
schedule.run()
run(15*60)
python有个APScheduler
使用方法可以看
https://www.cnblogs.com/luxiaojun/p/6567132.html