vscode tornado环境
conn error : (2005, "Unknown MySQL server host 'mysqldb' (0)")
error : local variable 'db' referenced before assignment
error : 'NoneType' object is not subscriptable
参考GPT和自己的思路:
根据你提供的信息,错误代码显示MySQL服务器无法识别主机“mysqldb”,这可能是因为MySQL服务器未正确配置或连接的hostname不正确。请确保MySQL服务器正常运行并连接正确。此外,第二个错误代码表示当试图使用尚未定义的变量“db”时,出现了本地变量引用错误。因此,您需要确保在使用“db”变量之前对其进行定义。最后,第三个错误代码表示尝试对“NoneType”类型的对象进行下标引用,这可能是因为在引用None时未正确处理错误。建议您检查代码并进行必要的更改,以避免这些错误。
参考GPT和自己的思路:
针对你的问题,首先需要分析错误信息所示的具体情况。根据提示,“Unknown MySQL server host 'mysqldb' (0)”这个错误是由于MySQL数据库连接时无法找到服务器而导致的。这通常是由于主机名或端口号输错或没有正确配置导致的。因此,建议您检查MySQL服务器主机名是否正确,端口号是否正确设置,并确保MySQL服务器已正确配置。如果是在本地测试环境中使用了MySQL,也需要确保MySQL服务已启动。
接着,看到错误 'NoneType' object is not subscriptable,一般情况下是由于某个变量返回了None导致的。我们需要检查一下代码逻辑,确认是否有变量可能会返回None,并在其使用前添加空值判断。
最后,针对 'local variable 'db' referenced before assignment' 这个错误,通常是由于变量在使用前未进行初始化赋值,或者在使用前已被销毁。我们需要检查一下代码的变量声明和引用逻辑,确保变量在使用前已经被正确初始化并且不会出现被销毁的情况。
18daily文件:
#!/usr/local/bin/python3
# -*- coding: utf-8 -*-
import libs.common as common
import sys
import time
import pandas as pd
import numpy as np
from sqlalchemy.types import NVARCHAR
from sqlalchemy import inspect
import datetime
import akshare as ak
import MySQLdb
# 600开头的股票是上证A股,属于大盘股
# 600开头的股票是上证A股,属于大盘股,其中6006开头的股票是最早上市的股票,
# 6016开头的股票为大盘蓝筹股;900开头的股票是上证B股;
# 000开头的股票是深证A股,001、002开头的股票也都属于深证A股,
# 其中002开头的股票是深证A股中小企业股票;
# 200开头的股票是深证B股;
# 300开头的股票是创业板股票;400开头的股票是三板市场股票。
def stock_a(code):
# print(code)
# print(type(code))
# 上证A股 # 深证A股
if code.startswith('600') or code.startswith('6006') or code.startswith('601') or code.startswith('000') or code.startswith('001') or code.startswith('002'):
return True
else:
return False
# 过滤掉 st 股票。
def stock_a_filter_st(name):
# print(code)
# print(type(code))
# 上证A股 # 深证A股
if name.find("ST") == -1:
return True
else:
return False
# 过滤价格,如果没有基本上是退市了。
def stock_a_filter_price(latest_price):
# float 在 pandas 里面判断 空。
if np.isnan(latest_price):
return False
else:
return True
####### 3.pdf 方法。宏观经济数据
# 接口全部有错误。只专注股票数据。
def stat_all(tmp_datetime):
datetime_str = (tmp_datetime).strftime("%Y-%m-%d")
datetime_int = (tmp_datetime).strftime("%Y%m%d")
print("datetime_str:", datetime_str)
print("datetime_int:", datetime_int)
# 股票列表
try:
data = ak.stock_zh_a_spot_em()
# print(data.index)
# 解决ESP 小数问题。
# data["esp"] = data["esp"].round(2) # 数据保留2位小数
data.columns = ['index', 'code', 'name', 'latest_price', 'quote_change', 'ups_downs', 'volume', 'turnover',
'amplitude', 'high', 'low', 'open', 'closed', 'quantity_ratio', 'turnover_rate', 'pe_dynamic',
'pb']
data = data.loc[data["code"].apply(stock_a)].loc[data["name"].apply(stock_a_filter_st)].loc[
data["latest_price"].apply(stock_a_filter_price)]
print(data)
data['date'] = datetime_int # 修改时间成为int类型。
# 删除老数据。
del_sql = " DELETE FROM `stock_zh_ah_name` where `date` = '%s' " % datetime_int
common.insert(del_sql)
data.set_index('code', inplace=True)
data.drop('index', axis=1, inplace=True)
print(data)
# 删除index,然后和原始数据合并。
common.insert_db(data, "stock_zh_ah_name", True, "`date`,`code`")
except Exception as e:
print("error :", e)
# 龙虎榜-个股上榜统计
# 接口: stock_sina_lhb_ggtj
#
# 目标地址: http://vip.stock.finance.sina.com.cn/q/go.php/vLHBData/kind/ggtj/index.phtml
#
# 描述: 获取新浪财经-龙虎榜-个股上榜统计
#
try:
stock_sina_lhb_ggtj = ak.stock_sina_lhb_ggtj(recent_day="5")
print(stock_sina_lhb_ggtj)
stock_sina_lhb_ggtj.columns = ['code', 'name', 'ranking_times', 'sum_buy', 'sum_sell', 'net_amount', 'buy_seat',
'sell_seat']
stock_sina_lhb_ggtj = stock_sina_lhb_ggtj.loc[stock_sina_lhb_ggtj["code"].apply(stock_a)].loc[
stock_sina_lhb_ggtj["name"].apply(stock_a_filter_st)]
stock_sina_lhb_ggtj.set_index('code', inplace=True)
# data_sina_lhb.drop('index', axis=1, inplace=True)
# 删除老数据。
stock_sina_lhb_ggtj['date'] = datetime_int # 修改时间成为int类型。
# 删除老数据。
del_sql = " DELETE FROM `stock_sina_lhb_ggtj` where `date` = '%s' " % datetime_int
common.insert(del_sql)
common.insert_db(stock_sina_lhb_ggtj, "stock_sina_lhb_ggtj", True, "`date`,`code`")
except Exception as e:
print("error :", e)
# 每日统计
# 接口: stock_dzjy_mrtj
#
# 目标地址: http://data.eastmoney.com/dzjy/dzjy_mrtj.aspx
#
# 描述: 获取东方财富网-数据中心-大宗交易-每日统计
try:
print("################ tmp_datetime : " + datetime_str)
#stock_dzjy_mrtj = ak.stock_dzjy_mrtj(start_date=datetime_str, end_date=datetime_str)
stock_dzjy_mrtj = ak.stock_dzjy_mrtj(start_date=datetime_str, end_date=datetime_str)
print('((((((((((((((((((((((baijianyun')
print(stock_dzjy_mrtj)
stock_dzjy_mrtj.columns = ['index', 'trade_date', 'code', 'name', 'quote_change', 'close_price', 'average_price',
'overflow_rate', 'trade_number', 'sum_volume', 'sum_turnover',
'turnover_market_rate']
stock_dzjy_mrtj.set_index('code', inplace=True)
# data_sina_lhb.drop('index', axis=1, inplace=True)
# 删除老数据。
stock_dzjy_mrtj['date'] = datetime_int # 修改时间成为int类型。
stock_dzjy_mrtj.drop('trade_date', axis=1, inplace=True)
stock_dzjy_mrtj.drop('index', axis=1, inplace=True)
print(stock_dzjy_mrtj)
# 数据保留2位小数
try:
stock_dzjy_mrtj = stock_dzjy_mrtj.loc[stock_dzjy_mrtj["code"].apply(stock_a)].loc[
stock_dzjy_mrtj["name"].apply(stock_a_filter_st)]
stock_dzjy_mrtj["average_price"] = stock_dzjy_mrtj["average_price"].round(2)
stock_dzjy_mrtj["overflow_rate"] = stock_dzjy_mrtj["overflow_rate"].round(4)
stock_dzjy_mrtj["turnover_market_rate"] = stock_dzjy_mrtj["turnover_market_rate"].round(6)
except Exception as e:
print("round error :", e)
# 删除老数据。
del_sql = " DELETE FROM `stock_dzjy_mrtj` where `date` = '%s' " % datetime_int
common.insert(del_sql)
print(stock_dzjy_mrtj)
common.insert_db(stock_dzjy_mrtj, "stock_dzjy_mrtj", True, "`date`,`code`")
except Exception as e:
print("error :", e)
# main函数入口
if __name__ == '__main__':
# 执行数据初始化。
# 使用方法传递。
tmp_datetime = common.run_with_args(stat_all)
common文件:
#!/usr/local/bin/python
# -*- coding: utf-8 -*-
# apk add py-mysqldb or
import platform
import datetime
import time
import sys
import os
import pymysql
from sqlalchemy import create_engine
from sqlalchemy.types import NVARCHAR
from sqlalchemy import inspect
import pandas as pd
import traceback
import akshare as ak
# 使用环境变量获得数据库。兼容开发模式可docker模式。
MYSQL_HOST = os.environ.get('MYSQL_HOST') if (os.environ.get('MYSQL_HOST') != None) else "localhost"
MYSQL_USER = os.environ.get('MYSQL_USER') if (os.environ.get('MYSQL_USER') != None) else "root"
MYSQL_PWD = os.environ.get('MYSQL_PWD') if (os.environ.get('MYSQL_PWD') != None) else "root"
MYSQL_DB = os.environ.get('MYSQL_DB') if (os.environ.get('MYSQL_DB') != None) else "blh"
print("MYSQL_HOST :", MYSQL_HOST, ",MYSQL_USER :", MYSQL_USER, ",MYSQL_DB :", MYSQL_DB)
MYSQL_CONN_URL = "mysql+pymysql://" + MYSQL_USER + ":" + MYSQL_PWD + "@" + MYSQL_HOST + ":3306/" + MYSQL_DB + "?charset=utf8mb4"
print("MYSQL_CONN_URL :", MYSQL_CONN_URL)
__version__ = "2.0.0"
# 每次发布时候更新。
def engine():
engine = create_engine(
MYSQL_CONN_URL,
encoding='utf8', convert_unicode=True)
return engine
def engine_to_db(to_db):
MYSQL_CONN_URL_NEW = "mysql+mysqldb://" + MYSQL_USER + ":" + MYSQL_PWD + "@" + MYSQL_HOST + ":3306/" + to_db + "?charset=utf8mb4"
engine = create_engine(
MYSQL_CONN_URL_NEW,
encoding='utf8', convert_unicode=True)
return engine
# 通过数据库链接 engine。
def conn():
try:
db = pymysql.connect(MYSQL_HOST, MYSQL_USER, MYSQL_PWD, MYSQL_DB, charset="utf8")
# db.autocommit = True
except Exception as e:
print("conn error :", e)
db.autocommit(on=True)
return db.cursor()
# 定义通用方法函数,插入数据库表,并创建数据库主键,保证重跑数据的时候索引唯一。
def insert_db(data, table_name, write_index, primary_keys):
# 插入默认的数据库。
insert_other_db(MYSQL_DB, data, table_name, write_index, primary_keys)
# 增加一个插入到其他数据库的方法。
def insert_other_db(to_db, data, table_name, write_index, primary_keys):
# 定义engine
engine_mysql = engine_to_db(to_db)
# 使用 http://docs.sqlalchemy.org/en/latest/core/reflection.html
# 使用检查检查数据库表是否有主键。
insp = inspect(engine_mysql)
col_name_list = data.columns.tolist()
# 如果有索引,把索引增加到varchar上面。
if write_index:
# 插入到第一个位置:
col_name_list.insert(0, data.index.name)
print(col_name_list)
data.to_sql(name=table_name, con=engine_mysql, schema=to_db, if_exists='append',
dtype={col_name: NVARCHAR(length=255) for col_name in col_name_list}, index=write_index)
# print(insp.get_pk_constraint(table_name))
# print()
# print(type(insp))
# 判断是否存在主键
if insp.get_pk_constraint(table_name)['constrained_columns'] == []:
with engine_mysql.connect() as con:
# 执行数据库插入数据。
try:
con.execute('ALTER TABLE `%s` ADD PRIMARY KEY (%s);' % (table_name, primary_keys))
except Exception as e:
print("################## ADD PRIMARY KEY ERROR :", e)
# 插入数据。
def insert(sql, params=()):
with conn() as db:
print("insert sql:" + sql)
try:
db.execute(sql, params)
except Exception as e:
print("error :", e)
# 查询数据
def select(sql, params=()):
with conn() as db:
print("select sql:" + sql)
try:
db.execute(sql, params)
except Exception as e:
print("error :", e)
result = db.fetchall()
return result
# 计算数量
def select_count(sql, params=()):
with conn() as db:
print("select sql:" + sql)
try:
db.execute(sql, params)
except Exception as e:
print("error :", e)
result = db.fetchall()
# 只有一个数组中的第一个数据
if len(result) == 1:
return int(result[0][0])
else:
return 0
# 通用函数。获得日期参数。
def run_with_args(run_fun):
tmp_datetime_show = datetime.datetime.now() # 修改成默认是当日执行 + datetime.timedelta()
tmp_hour_int = int(tmp_datetime_show.strftime("%H"))
if tmp_hour_int < 12 :
# 判断如果是每天 中午 12 点之前运行,跑昨天的数据。
tmp_datetime_show = (tmp_datetime_show + datetime.timedelta(days=-1))
tmp_datetime_str = tmp_datetime_show.strftime("%Y-%m-%d %H:%M:%S.%f")
print("\n######################### hour_int %d " % tmp_hour_int)
str_db = "MYSQL_HOST :" + MYSQL_HOST + ", MYSQL_USER :" + MYSQL_USER + ", MYSQL_DB :" + MYSQL_DB
print("\n######################### " + str_db + " ######################### ")
print("\n######################### begin run %s %s #########################" % (run_fun, tmp_datetime_str))
start = time.time()
# 要支持数据重跑机制,将日期传入。循环次数
if len(sys.argv) == 3:
# python xxx.py 2017-07-01 10
tmp_year, tmp_month, tmp_day = sys.argv[1].split("-")
loop = int(sys.argv[2])
tmp_datetime = datetime.datetime(int(tmp_year), int(tmp_month), int(tmp_day))
for i in range(0, loop):
# 循环插入多次数据,重复跑历史数据使用。
# time.sleep(5)
tmp_datetime_new = tmp_datetime + datetime.timedelta(days=i)
try:
run_fun(tmp_datetime_new)
except Exception as e:
print("error :", e)
traceback.print_exc()
elif len(sys.argv) == 2:
# python xxx.py 2017-07-01
tmp_year, tmp_month, tmp_day = sys.argv[1].split("-")
tmp_datetime = datetime.datetime(int(tmp_year), int(tmp_month), int(tmp_day))
try:
run_fun(tmp_datetime)
except Exception as e:
print("error :", e)
traceback.print_exc()
else:
# tmp_datetime = datetime.datetime.now() + datetime.timedelta(days=-1)
try:
run_fun(tmp_datetime_show) # 使用当前时间
except Exception as e:
print("error :", e)
traceback.print_exc()
print("######################### finish %s , use time: %s #########################" % (
tmp_datetime_str, time.time() - start))
# 设置基础目录,每次加载使用。
bash_stock_tmp = "/data/cache/hist_data_cache/%s/%s/"
if not os.path.exists(bash_stock_tmp):
os.makedirs(bash_stock_tmp) # 创建多个文件夹结构。
print("######################### init tmp dir #########################")
# 增加读取股票缓存方法。加快处理速度。
def get_hist_data_cache(code, date_start, date_end):
cache_dir = bash_stock_tmp % (date_end[0:7], date_end)
# 如果没有文件夹创建一个。月文件夹和日文件夹。方便删除。
# print("cache_dir:", cache_dir)
if not os.path.exists(cache_dir):
os.makedirs(cache_dir)
cache_file = cache_dir + "%s^%s.gzip.pickle" % (date_end, code)
# 如果缓存存在就直接返回缓存数据。压缩方式。
if os.path.isfile(cache_file):
print("######### read from cache #########", cache_file)
return pd.read_pickle(cache_file, compression="gzip")
else:
print("######### get data, write cache #########", code, date_start, date_end)
stock = ak.stock_zh_a_hist(symbol= code, start_date=date_start, end_date=date_end, adjust="")
stock.columns = ['date', 'open', 'close', 'high', 'low', 'volume', 'amount', 'amplitude', 'quote_change',
'ups_downs', 'turnover']
if stock is None:
return None
stock = stock.sort_index(0) # 将数据按照日期排序下。
print(stock)
stock.to_pickle(cache_file, compression="gzip")
return stock
错误提示:
conn error : (2005, "Unknown MySQL server host 'mysqldb' (0)")
error : local variable 'db' referenced before assignment
################ tmp_datetime : 2023-03-19
error : 'NoneType' object is not subscriptable
确定这样的信息对吗?
MYSQL_CONN_URL_NEW = "mysql+mysqldb://" + MYSQL_USER + ":" + MYSQL_PWD + "@" + MYSQL_HOST + ":3306/" + to_db + "?charset=utf8mb4"
我的是这样的。
jdbc:mysql://127.0.0.1:3306/my_test??charset=utf8