$.get("/a1").done(function (data) {
myChart1.setOption({
xAxis: {
type: 'category',
data: data.year
},
yAxis: {
type: 'value'
},
series: [
{
data: data.num,
type: 'line'
}
]
})
})
```python
@app.route("/a1")
def a1():
conn = pymysql.connect(
host="localhost",
user="root",
password="baidu123",
database="test"
)
cursor = conn.cursor()
cursor.execute("select * from exam1")
data = cursor.fetchall()
exam_map = {"year": [],"num":[]}
for row in data:
exam_map.get("year").append(row[0])
exam_map.get("num").append(row[1])
return jsonify(exam_map)
```java
try {
String[] line = value.toString().split(",");
context.write(new Text(line[0]), new LongWritable(Long.parseLong(line[8])));
} catch (Exception e) {
return;
}
long sum = 0, count = 0,max=0,min=400;
for (LongWritable value : values) {
sum += value.get();
count++;
max=Math.max(max,value.get());
if(min>value.get()) {
min=Math.min(min,value.get());
}
}
context.write(key, new LongWritable(max));
context.write(key, new LongWritable(min));
```python
#spark
case class ka(time:String,word: String, count: Int)
def main(args: Array[String]): Unit = {
System.setProperty("hadoop.home.dir", "C:\\Users\\hw\\Desktop\\中级实战\\资料\\hadoop-2.9.2")
// val conf: SparkConf = new SparkConf().setAppName("spark_kafka").setMaster("local[2]")
val sess=SparkSession.builder().appName("spark_kafka").master("local[2]").getOrCreate()
val sc =sess.sparkContext
sc.setLogLevel("ERROR")
//拉取时间
val ssc = new StreamingContext(sc, Seconds(5))
//kafka配置
val kafkaParams = Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "master:9092",
ConsumerConfig.GROUP_ID_CONFIG -> "mygroup",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer])
//主题
val topics: Array[String] = Array("order")
//kafka数据流
val stream: InputDStream[ConsumerRecord[String, String]] = {
KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
)
}
import sess.implicits._
val prop=new Properties()
prop.setProperty("user","root")
prop.setProperty("password","123456")
prop.setProperty("driver","com.mysql.jdbc.Driver")
val url="jdbc:mysql://localhost/test"
stream.foreachRDD(
x => {
println("Time:" + new Date())
val value: RDD[(String, String,String)] = x.map(record => (record.key, record.value,record.value))
val filter = value.filter(!_._2.equals(""))
val words = filter.flatMap(_._2.split(" ")).map(x => (x, 1))
val count = words.reduceByKey(_ + _)
val sdf=new SimpleDateFormat("YY:mm:dd HH:mm:ss")
count.foreach(println)
count.map(x=>ka(sdf.format(new Date()),x._1,x._2)).toDF()
.write.mode(SaveMode.Append).jdbc(url,"20205080910068朱汉威",prop)
})
println("----------开始计算-------------")
ssc.start()
println("----------任务正在执行-------------")
ssc.awaitTermination()
}
#数据库可视化
import DBHelpe
import pandas as pd
import matplotlib.pyplot as plt
import warnings
#忽略警告信息
warnings.filterwarnings('ignore')
plt.rcParams['font.sans-serif']=['SimHei']
conn=DBHelpe.MyDBHelpe()
plt.figure()#创建一个画布
print(conn)
#读取sql语句,sql文件
df=pd.read_sql('select * from tb_lianjia',con=conn.conn)
df=df.groupby(by='type')['price'].sum()
print(df)
plt.subplot(2,2,1)#分割成2*2的矩阵,矩阵的第一个画柱状图
df.plot(kind='bar')
#通过sql语句进行可视化
df1=pd.read_sql("SELECT address,count(*)as 数量 FROM tb_lianjia GROUP BY address ORDER BY 数量 DESC LIMIT 10",con=conn.conn)
lables=df1['address']
x=df1['数量']
plt.subplot(2,2,2)
plt.pie(x,labels=lables,autopct='%2.f%%')
#根据不同地区平均价格画条形图
df2=pd.read_sql("SELECT address,avg(price) as 平均价格 from tb_lianjia GROUP BY address ORDER BY 平均价格 DESC LIMIT 20",con=conn.conn)
plt.subplot(2,2,3)
plt.barh(df2['address'],df2['平均价格'],color='r')
plt.show()
'''
爬取链家二手房数据,保存到MySQL数据库中
id, title,address,type,area,price,model
'''
import requests
import DBHelpe
from bs4 import BeautifulSoup
import time
import lxml
db=DBHelpe.MyDBHelpe()
num=int(input("请输入你爬取的页数:"))
for page in range(num):
url="https://sh.lianjia.com/ershoufang/pg{}/".format(page+1)
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36'}
page_text=requests.get(url,headers).text
time.sleep(3)
print("第"+str(page+1)+"页开始爬取完成!!!!!!")
#解析源码
soup=BeautifulSoup(page_text,'lxml')
li_list=soup.find("ul",{"class":"sellListContent"}).find_all("li",{"class":"LOGCLICKDATA"})
#获取每个字段数据
for li in li_list:
title=li.find("div",{"class":"title"}).a.text
print(title)
address=li.find("div",{"class":"positionInfo"}).a.text
#先找到父级div,在查询子的a标准
address1 = li.find("div", {"class": "positionInfo"}).find_next("a").find_next_sibling("a").text
print(address,address1)
#3室2厅 | 128.33平米 | 南 | 简装 | 高楼层(共22层) | 2011年建 | 板楼
#.......
#保存到数据库
sql="insert into tb_house(title,address,type,area,price,model) values('%s','%s','%s','%s','%s','%s')"%(title,address+address1,)
db.add(sql)
#MyDBHelpe
'''
面向对象进行封装,数据库操作类
'''
import pymysql
class MyDBHelpe:
#初始化连接数据库
def __init__(self):
self.conn = pymysql.connect(host="localhost", port=3306, user="root", passwd="baidu123", database="domedb")
#创建游标
self.cur=self.conn.cursor()
#销毁
def __del__(self):
self.cur.close()
self.conn.close()
print("数据库连接已关闭!!!!!!")
#添加方法
def add(self,sql):
self.cur.execute(sql)
# 把游标的操作提交到数据库
self.conn.commit() # 增加,删除,修改需要
print("数据插入成功!!!!")
def update(self,sql):
self.cur.execute(sql)
# 把游标的操作提交到数据库
self.conn.commit() # 增加,删除,修改需要
print("数据更新成功!!!!")
def delete(self,sql):
self.cur.execute(sql)
# 把游标的操作提交到数据库
self.conn.commit() # 增加,删除,修改需要
print("数据删除成功!!!!")
#查询所有记录
def findAll(self,sql):
self.cur.execute(sql)
data=self.cur.fetchall()
return data