1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262
| import json
import requests import logging import mysql.connector from datetime import datetime from apscheduler.schedulers.blocking import BlockingScheduler
logging.basicConfig(level='INFO')
REQUEST_URL = 'https://news.topurl.cn/api'
REQUEST_INTERVAL = 60
CATEGORY_ITEM_MAX_SIZE = 10
NEWS_MAX_SIZE = 10
PAST_HOURS_OF_NEWS = 23
class New(object): def __init__(self, title, url, category): self.title = title self.url = url self.id = int(url[-12:-5]) self.category = category
def to_markdown_string(news_category): """ 把抓取到的新闻组织成 markdown 文本 :param news_category: 新闻分类字典 :return: markdown 文本 """ if not isinstance(news_category, dict): return keys = news_category.keys() markdown = '' for key in keys: h1 = '# ' + key + ' \n\n' markdown = markdown + h1 news = news_category.get(key) for new in news: list_item = '- [%s](%s) \n\n' % (new.title, new.url) markdown = markdown + list_item
return markdown
def get_news_from_api(): """ 从 api 抓取新闻列表 :return: 新闻列表 """ api = requests.get(REQUEST_URL) logging.info('抓取内容为:' + api.text) if not api.ok: logging.warning("访问频率过高被限制") return [] response = json.loads(api.text) if 403 == response.get('code'): logging.warning("访问频率过高被限制") return []
news_list = response['data']['newsList'] return [New(i['title'], i['url'], i['category']) for i in news_list]
def init_database(): """ 初始化数据库,建表等 :return: None """ conn = mysql.connector.connect(user='root', password='123456', host='127.0.0.1', port=3306) cursor = conn.cursor()
cursor.execute('CREATE DATABASE if not exists `morning_new`') cursor.execute('USE `morning_new`')
news_table_sql = r""" create table if not exists `news` ( `id` bigint(0) not null auto_increment comment 'id', `category` varchar(20) not null comment 'category', `title` varchar(500) not null default '' comment 'title of the new', `url` varchar(1500) not null default '' comment 'url of the new', `create_time` timestamp not null default CURRENT_TIMESTAMP comment 'create_time', primary key (`id`) ) charset=utf8mb4 comment 'news item'; """ cursor.execute(news_table_sql) cursor.close() conn.close()
def get_mysql_connection(): """ 获取 mysql 连接 :return: mysql 连接 """ return mysql.connector.connect(user='root', password='123456', host='127.0.0.1', port=3306, database='morning_new')
def remove_duplicates(news): """ 去除重复新闻 :param news: 待去重新闻列表 :return: 去重后的新闻列表 """ if not isinstance(news, list) or len(news) == 0: return [] id_list = [i.id for i in news if isinstance(i, New)] sql = r"select id from news where id in (%s)" % ','.join(['%s'] * len(id_list)) conn = get_mysql_connection() cursor = conn.cursor() cursor.execute(sql, id_list) duplicates_new_id_list = [i[0] for i in cursor.fetchall()] cursor.close() conn.close() non_duplicates_new_list = [i for i in news if not duplicates_new_id_list.__contains__(i.id)]
return non_duplicates_new_list
def save_news(news): """ 把新闻写入数据库 :param news: 新闻列表 :return: None """ if not isinstance(news, list): return None news = remove_duplicates(news) if len(news) <= 0: return None
sql = r'insert into news (id, category, title, url) values ' param_list = [] for new in news: if not isinstance(new, New): continue sql = sql + ' (%s, %s, %s, %s), ' param_list.append(new.id) param_list.append(new.category) param_list.append(new.title) param_list.append(new.url)
sql = sql[0:-2] conn = get_mysql_connection() cursor = conn.cursor() cursor.execute(sql, param_list) conn.commit() cursor.close() conn.close() logging.info('存入 %s 条新闻' % len(news))
def query_categories(): """ 获取当日新闻分类列表 :return: 当日新闻分类列表 """ sql = r'select distinct category from news where create_time > (current_timestamp - interval %s hour)' conn = get_mysql_connection() cursor = conn.cursor() cursor.execute(sql, (PAST_HOURS_OF_NEWS, )) category_list = [i[0] for i in cursor.fetchall()] cursor.close() conn.close()
return category_list
def query_news(): """ 查询过去 23 小时内的最新几条新闻并分类返回 :return: 分类完成的过去 23 个小时内的最新几条新闻 """ category_list = query_categories() paper = {} sql = r""" select title, url from news where category = %s and create_time > (current_timestamp - interval %s hour) order by create_time desc limit %s """ conn = get_mysql_connection() cursor = conn.cursor() for category in category_list: cursor.execute(sql, (category, PAST_HOURS_OF_NEWS, CATEGORY_ITEM_MAX_SIZE)) fetch = cursor.fetchall() news = [New(i[0], i[1], category) for i in fetch] paper[category] = news cursor.close() conn.close()
return paper
def get_and_save_news(): """ 从 api 获取新闻并存储到数据库 :return: None """ news = get_news_from_api() save_news(news)
def query_and_publish_news(): """ 查询并发布新闻 :return: None """ paper = query_news() dt = datetime.now() with open(dt.strftime('%Y%m%d%H%M%S') + '.md', 'w') as f: f.write(to_markdown_string(paper))
def main(): init_database() scheduler = BlockingScheduler() scheduler.add_job(get_and_save_news, 'interval', seconds=REQUEST_INTERVAL) scheduler.add_job(query_and_publish_news, 'cron', minute="0") try: scheduler.start() except (KeyboardInterrupt, SystemExit): print('Exit') pass
if __name__ == '__main__': main()
|