使用create_engine连接数据库报错,不会搞啊。
engine = create_engine(f"microsoft+pyodbc:///?odbc_connect=Driver={{Microsoft Access Driver (*.mdb,*.accdb)}};DBQ={path1}")
import datetime
import time
import pandas as pd
from sqlalchemy import create_engine
import requests
import json
# 发送消息
def dingmessage(x):
webhook = "https://www.hdghh.com/gadday/robot/weook/send?yzpe=0&yzoken=04512b??"
# 构建请求头部
header = {"Content-Type": "application/json","Charset": "UTF-8"}
# 构建请求数据
message = {"content": x}
# 对请求的数据进行json封装
message_json = json.dumps(message)
# 发送请求
info = requests.post(url=webhook, data=message_json, headers=header)
def check(shift):
df_ip = pd.read_csv("IP.csv",usecols=['IP','Line','Path','Partment'])
for i in range(len(df_ip)):
start_time = time.time()
ip = df_ip.at[i,'IP']
line = df_ip.at[i,'Line']
path = df_ip.at[i,'Path']
partment = df_ip.at[i,'Partment']
filename=datetime.date.today().strftime("%Y-%m-%d") + "-" + line + "-" + shift + ".mdb"
path1 = fr"\\{ip}{path}\{filename}"
try:
engine = create_engine(f"microsoft+pyodbc:///?odbc_connect=Driver={{Microsoft Access Driver (*.mdb,*.accdb)}};DBQ={path1}")
# 查询结果条数
count_sql = 'select count(uniqueid) from halm_results'
df_count = pd.read_sql(count_sql, engine)
total_count = df_count.iloc[0,0]
# 取最后2000条数据
x1 = max(total_count - 2000,0)
x2 = total_count
# 查询结果详细信息
sql = 'select testtime,flashmonicellvoltage,uniqueid from halm_results where uniqueid between ? and ?'
df = pd.read_sql(sql, engine, params=[x1, x2])
# 判断异常情况
df1 = df['flashmonicellvoltage'].tolist()
df2 = df['testtime']
incount = 0
decount = 0
i=0
last = df1[0]
for n in df1[1:]:
i += 1
if i > 10:
time_x = pd.to_datetime(df2[i -6])-pd.to_datetime(df2[i-10])
if n > last:
last = n
incount += 1
decount = 0
elif n == last:
last = n
decount = 0
incount = 0
else:
last = n
incount = 0
decount += 1
if incount == 7:
xx=' '.join([partment, line, df['testtime'][i],'停机时间约', str(time_x)])
dingmessage(xx)
break
elif decount == 7:
xx=' '.join([partment, line, df['testtime'][i],'停机时间约', str(time_x)])
dingmessage(xx)
break
except Exception as e:
xx=' '.join([partment, line,'文件未读取'])
dingmessage(xx)
print(line+'文件未读取', e)
def send_dingtalk_messages(timer_interval, shift):
while True:
check(shift)
time.sleep(timer_interval)
send_dingtalk_messages(1800, input('请输入班别:'))