| import json
import requests import logging import mysql.connector from datetime import datetime from apscheduler.schedulers.blocking import BlockingScheduler
REQUEST_URL = 'https://news.topurl.cn/api'
class New(object): def __init__(self, title, url, category): self.title = title self.url = url self.id = int(url[-12:-5]) self.category = category
def to_markdown_string(news_category): """ 把抓取到的新闻组织成 markdown 文本 :param news_category: 新闻分类字典 :return: markdown 文本 """ if not isinstance(news_category, dict): return keys = news_category.keys() markdown = '' for key in keys: h1 = '# ' + key + ' \n\n' markdown = markdown + h1 news = news_category.get(key) for new in news: list_item = '- [%s](%s) \n\n' % (new.title, new.url) markdown = markdown + list_item
return markdown
def get_news_from_api(): """ 从 api 抓取新闻列表 :return: 新闻列表 """ api = requests.get(REQUEST_URL) logging.info('抓取内容为:' + api.text) if not api.ok: logging.warning("访问频率过高被限制") return [] response = json.loads(api.text) if 403 == response.get('code'): logging.warning("访问频率过高被限制") return []
news_list = response['data']['newsList'] return [New(i['title'], i['url'], i['category']) for i in news_list]
def init_database(): """ 初始化数据库,建表等 :return: None """ conn = mysql.connector.connect(user='root', password='123456', host='', port=3306) cursor = conn.cursor()
cursor.execute('CREATE DATABASE if not exists `morning_new`') cursor.execute('USE `morning_new`')
news_table_sql = r""" create table if not exists `news` ( `id` bigint(0) not null auto_increment comment 'id', `category` varchar(20) not null comment 'category', `title` varchar(500) not null default '' comment 'title of the new', `url` varchar(1500) not null default '' comment 'url of the new', `create_time` timestamp not null default CURRENT_TIMESTAMP comment 'create_time', primary key (`id`) ) charset=utf8mb4 comment 'news item'; """ cursor.execute(news_table_sql) cursor.close() conn.close()
def get_mysql_connection(): """ 获取 mysql 连接 :return: mysql 连接 """ return mysql.connector.connect(user='root', password='123456', host='', port=3306, database='morning_new')
def remove_duplicates(news): """ 去除重复新闻 :param news: 待去重新闻列表 :return: 去重后的新闻列表 """ if not isinstance(news, list) or len(news) == 0: return [] id_list = [i.id for i in news if isinstance(i, New)] sql = r"select id from news where id in (%s)" % ','.join(['%s'] * len(id_list)) conn = get_mysql_connection() cursor = conn.cursor() cursor.execute(sql, id_list) duplicates_new_id_list = [i[0] for i in cursor.fetchall()] cursor.close() conn.close() non_duplicates_new_list = [i for i in news if not duplicates_new_id_list.__contains__(i.id)]
return non_duplicates_new_list
def save_news(news): """ 把新闻写入数据库 :param news: 新闻列表 :return: None """ if not isinstance(news, list): return None news = remove_duplicates(news) if len(news) <= 0: return None
sql = r'insert into news (id, category, title, url) values ' param_list = [] for new in news: if not isinstance(new, New): continue sql = sql + ' (%s, %s, %s, %s), ' param_list.append(new.id) param_list.append(new.category) param_list.append(new.title) param_list.append(new.url)
sql = sql[0:-2] conn = get_mysql_connection() cursor = conn.cursor() cursor.execute(sql, param_list) conn.commit() cursor.close() conn.close() logging.info('存入 %s 条新闻' % len(news))
def query_categories(): """ 获取当日新闻分类列表 :return: 当日新闻分类列表 """ sql = r'select distinct category from news where create_time > (current_timestamp - interval %s hour)' conn = get_mysql_connection() cursor = conn.cursor() cursor.execute(sql, (PAST_HOURS_OF_NEWS, )) category_list = [i[0] for i in cursor.fetchall()] cursor.close() conn.close()
return category_list
def query_news(): """ 查询过去 23 小时内的最新几条新闻并分类返回 :return: 分类完成的过去 23 个小时内的最新几条新闻 """ category_list = query_categories() paper = {} sql = r""" select title, url from news where category = %s and create_time > (current_timestamp - interval %s hour) order by create_time desc limit %s """ conn = get_mysql_connection() cursor = conn.cursor() for category in category_list: cursor.execute(sql, (category, PAST_HOURS_OF_NEWS, CATEGORY_ITEM_MAX_SIZE)) fetch = cursor.fetchall() news = [New(i[0], i[1], category) for i in fetch] paper[category] = news cursor.close() conn.close()
return paper
def get_and_save_news(): """ 从 api 获取新闻并存储到数据库 :return: None """ news = get_news_from_api() save_news(news)
def query_and_publish_news(): """ 查询并发布新闻 :return: None """ paper = query_news() dt = datetime.now() with open(dt.strftime('%Y%m%d%H%M%S') + '.md', 'w') as f: f.write(to_markdown_string(paper))
def main(): init_database() scheduler = BlockingScheduler() scheduler.add_job(get_and_save_news, 'interval', seconds=REQUEST_INTERVAL) scheduler.add_job(query_and_publish_news, 'cron', minute="0") try: scheduler.start() except (KeyboardInterrupt, SystemExit): print('Exit') pass
if __name__ == '__main__': main()