test,百度,spark,测试不用回答


$.get("/a1").done(function (data) {
        myChart1.setOption({
            xAxis: {
                type: 'category',
                data: data.year
            },
            yAxis: {
                type: 'value'
            },
            series: [
                {
                    data: data.num,
                    type: 'line'
                }
            ]
        })

    })
```python
@app.route("/a1")
def a1():
    conn = pymysql.connect(
        host="localhost",
        user="root",
        password="baidu123",
        database="test"

    )
    cursor = conn.cursor()
    cursor.execute("select * from exam1")
    data = cursor.fetchall()
    exam_map = {"year": [],"num":[]}
    for row in data:
        exam_map.get("year").append(row[0])
        exam_map.get("num").append(row[1])
    return jsonify(exam_map)
```java
try {
                String[] line = value.toString().split(",");
                context.write(new Text(line[0]), new LongWritable(Long.parseLong(line[8])));
            } catch (Exception e) {
                return;
            }

long sum = 0, count = 0,max=0,min=400;
            for (LongWritable value : values) {
                sum += value.get();
                count++;
                max=Math.max(max,value.get());
                if(min>value.get()) {
                    min=Math.min(min,value.get());
                }
            }
            context.write(key, new LongWritable(max));
            context.write(key, new LongWritable(min));
```python
#spark
case class ka(time:String,word: String, count: Int)

  def main(args: Array[String]): Unit = {
    System.setProperty("hadoop.home.dir", "C:\\Users\\hw\\Desktop\\中级实战\\资料\\hadoop-2.9.2")
//    val conf: SparkConf = new SparkConf().setAppName("spark_kafka").setMaster("local[2]")
    val sess=SparkSession.builder().appName("spark_kafka").master("local[2]").getOrCreate()
    val sc =sess.sparkContext
    sc.setLogLevel("ERROR")
    //拉取时间
    val ssc = new StreamingContext(sc, Seconds(5))

    //kafka配置
    val kafkaParams = Map[String, Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "master:9092",
      ConsumerConfig.GROUP_ID_CONFIG -> "mygroup",
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer])
    //主题
    val topics: Array[String] = Array("order")
    //kafka数据流
    val stream: InputDStream[ConsumerRecord[String, String]] = {
      KafkaUtils.createDirectStream[String, String](
        ssc,
        LocationStrategies.PreferConsistent,
        ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
      )
    }
    import sess.implicits._

    val prop=new Properties()
    prop.setProperty("user","root")
    prop.setProperty("password","123456")
    prop.setProperty("driver","com.mysql.jdbc.Driver")
    val url="jdbc:mysql://localhost/test"

    stream.foreachRDD(
      x => {
        println("Time:" + new Date())
        val value: RDD[(String, String,String)] = x.map(record => (record.key, record.value,record.value))
        val filter = value.filter(!_._2.equals(""))
        val words = filter.flatMap(_._2.split(" ")).map(x => (x, 1))
        val count = words.reduceByKey(_ + _)
        val sdf=new SimpleDateFormat("YY:mm:dd HH:mm:ss")
        count.foreach(println)
        count.map(x=>ka(sdf.format(new Date()),x._1,x._2)).toDF()
          .write.mode(SaveMode.Append).jdbc(url,"20205080910068朱汉威",prop)

      })

    println("----------开始计算-------------")
    ssc.start()
    println("----------任务正在执行-------------")
    ssc.awaitTermination()
  }

#数据库可视化
import DBHelpe
import pandas as pd
import  matplotlib.pyplot as plt
import warnings
#忽略警告信息
warnings.filterwarnings('ignore')
plt.rcParams['font.sans-serif']=['SimHei']
conn=DBHelpe.MyDBHelpe()
plt.figure()#创建一个画布
print(conn)
#读取sql语句,sql文件
df=pd.read_sql('select *  from tb_lianjia',con=conn.conn)
df=df.groupby(by='type')['price'].sum()
print(df)
plt.subplot(2,2,1)#分割成2*2的矩阵,矩阵的第一个画柱状图
df.plot(kind='bar')
#通过sql语句进行可视化

df1=pd.read_sql("SELECT address,count(*)as 数量 FROM tb_lianjia GROUP BY address ORDER BY 数量 DESC LIMIT 10",con=conn.conn)
lables=df1['address']
x=df1['数量']
plt.subplot(2,2,2)
plt.pie(x,labels=lables,autopct='%2.f%%')
#根据不同地区平均价格画条形图
df2=pd.read_sql("SELECT address,avg(price) as 平均价格 from tb_lianjia GROUP BY address ORDER BY  平均价格 DESC LIMIT 20",con=conn.conn)
plt.subplot(2,2,3)
plt.barh(df2['address'],df2['平均价格'],color='r')
plt.show()
'''
爬取链家二手房数据,保存到MySQL数据库中
id, title,address,type,area,price,model
'''
import requests
import DBHelpe
from bs4 import  BeautifulSoup
import time
import lxml
db=DBHelpe.MyDBHelpe()
num=int(input("请输入你爬取的页数:"))
for page in range(num):
    url="https://sh.lianjia.com/ershoufang/pg{}/".format(page+1)
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36'}
    page_text=requests.get(url,headers).text
    time.sleep(3)
    print("第"+str(page+1)+"页开始爬取完成!!!!!!")
    #解析源码
    soup=BeautifulSoup(page_text,'lxml')

    li_list=soup.find("ul",{"class":"sellListContent"}).find_all("li",{"class":"LOGCLICKDATA"})
    #获取每个字段数据
    for li in  li_list:
        title=li.find("div",{"class":"title"}).a.text
        print(title)
        address=li.find("div",{"class":"positionInfo"}).a.text
        #先找到父级div,在查询子的a标准
        address1 = li.find("div", {"class": "positionInfo"}).find_next("a").find_next_sibling("a").text
        print(address,address1)
        #3室2厅 | 128.33平米 | 南 | 简装 | 高楼层(共22层) | 2011年建 | 板楼
        #.......
        #保存到数据库
        sql="insert into tb_house(title,address,type,area,price,model) values('%s','%s','%s','%s','%s','%s')"%(title,address+address1,)
        db.add(sql)


#MyDBHelpe
'''
面向对象进行封装,数据库操作类
'''
import pymysql
class MyDBHelpe:
    #初始化连接数据库
    def __init__(self):
        self.conn = pymysql.connect(host="localhost", port=3306, user="root", passwd="baidu123", database="domedb")
        #创建游标
        self.cur=self.conn.cursor()
    #销毁
    def __del__(self):
        self.cur.close()
        self.conn.close()
        print("数据库连接已关闭!!!!!!")
    #添加方法
    def add(self,sql):
        self.cur.execute(sql)
        # 把游标的操作提交到数据库
        self.conn.commit()  # 增加,删除,修改需要
        print("数据插入成功!!!!")
    def update(self,sql):
        self.cur.execute(sql)
        # 把游标的操作提交到数据库
        self.conn.commit()  # 增加,删除,修改需要
        print("数据更新成功!!!!")
    def delete(self,sql):
        self.cur.execute(sql)
        # 把游标的操作提交到数据库
        self.conn.commit()  # 增加,删除,修改需要
        print("数据删除成功!!!!")

    #查询所有记录
    def findAll(self,sql):
        self.cur.execute(sql)
        data=self.cur.fetchall()
        return  data