1.参考 #
2.配置VSCode #
2.1. 创建虚拟环境 #
# 创建虚拟环境
python -m venv .venv
# 激活虚拟环境(Windows)
.venv\Scripts\activate.bat
# 激活虚拟环境(Linux/macOS)
source .venv/Scripts/activate2.2 格式化python代码插件 #
按下Ctrl+Shift+P 选择 Preferences:OpenUserSettings(JSON)
添加
{
"[python]": {
"editor.defaultFormatter": "ms-python.black-formatter",
"editor.formatOnSave": true
}
}2.3. 格式化python代码插件 #
VS Code 的 "Run Code"(由 Code Runner 扩展提供)可能未正确配置编码。
- 打开 VS Code 设置(
Ctrl + ,),搜索Code Runner,找到Code-runner: Executor Map。 - 在
settings.json中添加 Python 执行的编码配置:如果想在Run Code时使用当前目录的虚拟环境可以配置"code-runner.executorMap": { "python": "set PYTHONIOENCODING=utf8 && python -u $fullFileName" }(Windows 用户用"code-runner.executorMap": { "python": "set PYTHONIOENCODING=utf8 && $workspaceRoot/.venv/Scripts/python.exe -u $fullFileName", }set,Mac/Linux 用export)
3.初始化爬虫类 #
pip install DrissionPage3.1 spider.py #
# 导入logging模块,用于日志记录
import logging
# 配置日志基本设置,日志级别为INFO,日志格式包含时间、级别和消息
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
# 获取以当前模块名为名称的logger对象
logger = logging.getLogger(__name__)
# 定义BossSpider类,负责爬取职位信息
class BossSpider:
# 定义爬取职位信息的方法
def crawl_jobs(self):
# 记录一条INFO级别的日志,表示开始爬取
logger.info("开始爬取职位信息")
# 定义运行爬虫的函数
def run_spider():
try:
# 创建BossSpider实例
spider = BossSpider()
# 调用实例的爬取方法
spider.crawl_jobs()
except Exception as e:
# 捕获异常并记录错误日志
logger.error(f"爬虫运行失败: {e}")
# 重新抛出异常会保留原始的异常堆栈信息(traceback),这样更方便后续调试和定位问题。
# raise e:抛出异常对象 e,但会丢失原始异常的堆栈信息
raise
# 判断是否为主程序入口
if __name__ == '__main__':
# 运行爬虫
run_spider()4.爬取职位信息 #
4.1. .env #
.env
# Chrome配置
CHROME_PATH=C:\Program Files\Google\Chrome\Application\chrome.exe
SEARCH_KEYWORD=AI前端4.2. spider.py #
spider.py
# 导入日志模块
import logging
# 导入操作系统相关模块
+import os
# 导入pprint用于美观打印数据
+from pprint import pprint
# 导入类型提示相关类型
+from typing import Optional, Dict
# 从DrissionPage库导入ChromiumOptions和Chromium类
+from DrissionPage import ChromiumOptions, Chromium
# 配置日志输出格式和级别
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
# 获取以当前模块名为名称的logger对象
logger = logging.getLogger(__name__)
# 从环境变量获取Chrome浏览器路径,若未设置则使用默认路径
+CHROME_PATH = os.getenv('CHROME_PATH', r'C:\Program Files\Google\Chrome\Application\chrome.exe')
# 从环境变量获取搜索关键词,若未设置则默认为'AI前端'
+KEYWORD = os.getenv('SEARCH_KEYWORD', 'AI前端')
# 定义BossSpider爬虫类
class BossSpider:
# 初始化方法
+ def __init__(self):
# 保存Chrome浏览器路径
+ self.chrome_path = CHROME_PATH
# 初始化浏览器
+ self._setup_browser()
# 浏览器初始化方法
+ def _setup_browser(self):
+ try:
# 设置浏览器路径并保存配置
+ ChromiumOptions().set_browser_path(self.chrome_path).save()
# 创建Chromium浏览器对象
+ self.browser = Chromium()
# 获取最新的标签页对象
+ self.tab = self.browser.latest_tab
+ except Exception as e:
# 浏览器初始化失败时记录错误日志
+ logger.error(f"浏览器初始化失败: {e}")
# 抛出异常
+ raise
# 获取职位详情的方法,参数为职位卡片对象
+ def get_job_details(self, card) -> Optional[Dict]:
+ try:
# 获取职位名称
+ job_name = card.ele('.job-name').text
# 获取公司名称
+ company = card.ele('@|class=boss-info@|class=boss-name').text
# 获取城市信息
+ city = card.ele('.company-location').text
# 获取职位标签列表
+ job_info = card.ele('.tag-list')
# 获取所有标签元素
+ job_infos = job_info.eles('tag:li')
# 获取工作经验
+ experience = job_infos[0].text if len(job_infos) > 0 else ''
# 获取学历要求
+ education = job_infos[1].text if len(job_infos) > 1 else ''
# 返回职位详情字典
+ return {
+ 'job_name': job_name,
+ 'company': company,
+ 'city': city,
+ 'experience': experience,
+ 'education': education
+ }
+ except Exception as e:
# 获取职位详情失败时记录错误日志
+ logger.error(f"获取职位详情失败: {e}")
# 返回None
+ return None
# 爬取职位信息的方法
def crawl_jobs(self):
# 记录开始爬取的日志
logger.info("开始爬取职位信息")
# 打开职位搜索页面
+ self.tab.get(f'https://www.zhipin.com/web/geek/jobs?query={KEYWORD}')
+ try:
# 等待职位卡片元素加载完成,超时时间10秒
+ self.tab.wait.eles_loaded('.job-card-box', timeout=10)
+ except Exception as e:
# 等待超时时记录错误日志
+ logger.error(f"等待职位卡片元素超时: {e}")
# 直接返回
+ return
# 获取所有职位卡片元素
+ cards = self.tab.eles('.job-card-box')
# 遍历每个职位卡片
+ for card in cards:
# 获取职位详情
+ job_data = self.get_job_details(card)
# 打印职位详情
+ pprint(job_data)
# 定义运行爬虫的函数
def run_spider():
try:
# 创建BossSpider实例
spider = BossSpider()
# 调用爬取方法
spider.crawl_jobs()
except Exception as e:
# 捕获异常并记录错误日志
logger.error(f"爬虫运行失败: {e}")
# 重新抛出异常
raise
# 判断是否为主程序入口
if __name__ == '__main__':
# 运行爬虫
run_spider()5.爬取岗位描述 #
5.1. spider.py #
spider.py
# 导入日志模块
import logging
# 导入操作系统相关模块
import os
# 导入pprint用于美观打印数据
from pprint import pprint
# 导入类型提示相关类型
from typing import Optional, Dict
# 从DrissionPage库导入ChromiumOptions和Chromium类
from DrissionPage import ChromiumOptions, Chromium
# 配置日志输出格式和级别
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
# 获取以当前模块名为名称的logger对象
logger = logging.getLogger(__name__)
# 从环境变量获取Chrome浏览器路径,若未设置则使用默认路径
CHROME_PATH = os.getenv('CHROME_PATH', r'C:\Program Files\Google\Chrome\Application\chrome.exe')
# 从环境变量获取搜索关键词,若未设置则默认为'AI前端'
KEYWORD = os.getenv('SEARCH_KEYWORD', 'AI前端')
# 定义BossSpider爬虫类
class BossSpider:
# 初始化方法
def __init__(self):
# 保存Chrome浏览器路径
self.chrome_path = CHROME_PATH
# 初始化浏览器
self._setup_browser()
# 浏览器初始化方法
def _setup_browser(self):
try:
# 设置浏览器路径并保存配置
ChromiumOptions().set_browser_path(self.chrome_path).save()
# 创建Chromium浏览器对象
self.browser = Chromium()
# 获取最新的标签页对象
self.tab = self.browser.latest_tab
except Exception as e:
# 浏览器初始化失败时记录错误日志
logger.error(f"浏览器初始化失败: {e}")
# 抛出异常
raise
# 获取职位详情的方法,参数为职位卡片对象
def get_job_details(self, card) -> Optional[Dict]:
try:
# 获取职位名称
job_name = card.ele('.job-name').text
# 获取公司名称
company = card.ele('@|class=boss-info@|class=boss-name').text
# 获取城市信息
city = card.ele('.company-location').text
# 获取职位标签列表
job_info = card.ele('.tag-list')
# 获取所有标签元素
job_infos = job_info.eles('tag:li')
# 获取工作经验
experience = job_infos[0].text if len(job_infos) > 0 else ''
# 获取学历要求
education = job_infos[1].text if len(job_infos) > 1 else ''
# 等待监听到职位详情接口的响应
+ detail_result = self.tab.listen.wait()
# 获取接口响应的body内容
+ detail_data = detail_result.response.body
# 从响应数据中提取岗位描述信息
+ post_description = detail_data.get('zpData', {}).get('jobInfo', {}).get('postDescription', '')
# 返回职位详情字典
return {
'job_name': job_name,
'company': company,
'city': city,
'experience': experience,
+ 'education': education,
+ 'description': post_description
}
except Exception as e:
# 获取职位详情失败时记录错误日志
logger.error(f"获取职位详情失败: {e}")
# 返回None
return None
# 爬取职位信息的方法
def crawl_jobs(self):
# 监听职位详情接口
+ self.tab.listen.start('wapi/zpgeek/job/detail.json')
# 记录开始爬取的日志
logger.info("开始爬取职位信息")
# 打开职位搜索页面
self.tab.get(f'https://www.zhipin.com/web/geek/jobs?query={KEYWORD}')
try:
# 等待职位卡片元素加载完成,超时时间10秒
self.tab.wait.eles_loaded('.job-card-box', timeout=10)
except Exception as e:
# 等待超时时记录错误日志
logger.error(f"等待职位卡片元素超时: {e}")
# 直接返回
return
# 获取所有职位卡片元素
cards = self.tab.eles('.job-card-box')
# 遍历每个职位卡片
for card in cards:
# 获取职位详情
job_data = self.get_job_details(card)
# 打印职位详情
pprint(job_data)
# 定义运行爬虫的函数
def run_spider():
try:
# 创建BossSpider实例
spider = BossSpider()
# 调用爬取方法
spider.crawl_jobs()
except Exception as e:
# 捕获异常并记录错误日志
logger.error(f"爬虫运行失败: {e}")
# 重新抛出异常
raise
# 判断是否为主程序入口
if __name__ == '__main__':
# 运行爬虫
run_spider()6.保存数据库 #
6.1. 安装 #
pip install sqlalchemy pymysql python-dotenv 6.2. .env #
.env
# Chrome配置
CHROME_PATH=C:\Program Files\Google\Chrome\Application\chrome.exe
SEARCH_KEYWORD=AI前端
# 数据库配置
+MYSQL_HOST=localhost
+MYSQL_USER=root
+MYSQL_PASSWORD=123456
+MYSQL_PORT=3306
+MYSQL_DB=jobs6.3. spider.py #
spider.py
# 导入日志模块
import logging
# 导入操作系统相关模块
import os
# 导入pprint用于美观打印数据
from pprint import pprint
# 导入类型提示相关类型
from typing import Optional, Dict
# 从DrissionPage库导入ChromiumOptions和Chromium类
from DrissionPage import ChromiumOptions, Chromium
# 从SQLAlchemy导入数据库相关模块
+from sqlalchemy import create_engine, Column, Integer, String, Text
# 从SQLAlchemy ORM导入声明基类和会话工厂
+from sqlalchemy.orm import declarative_base,sessionmaker
# 导入上下文管理器
+from contextlib import contextmanager
# 导入环境变量加载模块
+from dotenv import load_dotenv
# 加载环境变量文件
+load_dotenv()
# 配置日志输出格式和级别
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
# 定义MySQL数据库连接配置字典
+MYSQL_CONF = {
+ 'user': os.getenv('MYSQL_USER', 'root'),
+ 'password': os.getenv('MYSQL_PASSWORD', '123456'),
+ 'host': os.getenv('MYSQL_HOST', 'localhost'),
+ 'port': int(os.getenv('MYSQL_PORT', 3306)),
+ 'db': os.getenv('MYSQL_DB', 'jobs')
+}
+
# 创建SQLAlchemy声明基类
+Base = declarative_base()
+
# 定义职位信息数据模型类
+class Job(Base):
+ """BOSS直聘职位信息表"""
# 指定数据库表名
+ __tablename__ = 'jobs'
# 主键字段
+ id = Column(Integer, primary_key=True, autoincrement=True, comment='主键ID')
# 职位基本信息字段
+ job_name = Column(String(128), nullable=True, comment='职位名称')
+ company = Column(String(128), nullable=True, comment='公司名称')
+ city = Column(String(64), nullable=True, comment='工作城市')
# 职位要求字段
+ experience = Column(String(64), nullable=True, comment='工作经验要求')
+ education = Column(String(64), nullable=True, comment='学历要求')
# 详细描述字段
+ description = Column(Text, nullable=True, comment='职位描述')
+
# 定义对象的字符串表示方法
+ def __repr__(self):
+ """返回职位的字符串表示"""
+ return f"<Job(position='{self.position}', company='{self.company}', city='{self.city}')>"
+
# 创建数据库引擎对象
+engine = create_engine(
+ f"mysql+pymysql://{MYSQL_CONF['user']}:{MYSQL_CONF['password']}"
+ f"@{MYSQL_CONF['host']}:{MYSQL_CONF['port']}/{MYSQL_CONF['db']}?charset=utf8mb4"
+)
+
# 创建数据库会话工厂
+Session = sessionmaker(bind=engine)
+
# 定义数据库会话上下文管理器装饰器
+@contextmanager
+def get_db_session():
+ """数据库会话上下文管理器"""
# 创建数据库会话
+ session = Session()
+ try:
# 返回会话对象
+ yield session
# 提交事务
+ session.commit()
+ except Exception as e:
# 发生异常时回滚事务
+ session.rollback()
# 重新抛出异常
+ raise
+ finally:
# 确保会话被关闭
+ session.close()
+
# 获取以当前模块名为名称的logger对象
logger = logging.getLogger(__name__)
+
# 从环境变量获取Chrome浏览器路径,若未设置则使用默认路径
CHROME_PATH = os.getenv('CHROME_PATH', r'C:\Program Files\Google\Chrome\Application\chrome.exe')
+
# 从环境变量获取搜索关键词,若未设置则默认为'AI前端'
KEYWORD = os.getenv('SEARCH_KEYWORD', 'AI前端')
# 定义BossSpider爬虫类
class BossSpider:
# 初始化方法
def __init__(self):
# 保存Chrome浏览器路径
self.chrome_path = CHROME_PATH
# 初始化浏览器
self._setup_browser()
# 浏览器初始化方法
def _setup_browser(self):
try:
# 设置浏览器路径并保存配置
ChromiumOptions().set_browser_path(self.chrome_path).save()
# 创建Chromium浏览器对象
self.browser = Chromium()
# 获取最新的标签页对象
self.tab = self.browser.latest_tab
except Exception as e:
# 浏览器初始化失败时记录错误日志
logger.error(f"浏览器初始化失败: {e}")
# 抛出异常
raise
# 获取职位详情的方法,参数为职位卡片对象
def get_job_details(self, card) -> Optional[Dict]:
try:
# 获取职位名称
job_name = card.ele('.job-name').text
# 获取公司名称
company = card.ele('@|class=boss-info@|class=boss-name').text
# 获取城市信息
city = card.ele('.company-location').text
# 打印进度
+ logger.info(f"{company} {job_name}")
# 获取职位标签列表
job_info = card.ele('.tag-list')
# 获取所有标签元素
job_infos = job_info.eles('tag:li')
# 获取工作经验
experience = job_infos[0].text if len(job_infos) > 0 else ''
# 获取学历要求
education = job_infos[1].text if len(job_infos) > 1 else ''
# 点击职位卡片
card.click()
# 等待1秒
self.browser.wait(1)
# 等待监听到职位详情接口的响应
detail_result = self.tab.listen.wait()
# 获取接口响应的body内容
detail_data = detail_result.response.body
# 从响应数据中提取岗位描述信息
post_description = detail_data.get('zpData', {}).get('jobInfo', {}).get('postDescription', '')
# 返回职位详情字典
return {
'job_name': job_name,
'company': company,
'city': city,
'experience': experience,
'education': education,
'description': post_description
}
except Exception as e:
# 获取职位详情失败时记录错误日志
logger.error(f"获取职位详情失败: {e}")
# 返回None
return None
# 爬取职位信息的方法
def crawl_jobs(self):
# 创建数据库表结构
+ Base.metadata.create_all(engine)
# 监听职位详情接口
self.tab.listen.start('wapi/zpgeek/job/detail.json')
# 记录开始爬取的日志
logger.info("开始爬取职位信息")
# 打开职位搜索页面
self.tab.get(f'https://www.zhipin.com/web/geek/jobs?query={KEYWORD}')
try:
# 等待职位卡片元素加载完成,超时时间10秒
self.tab.wait.eles_loaded('.job-card-box', timeout=10)
except Exception as e:
# 等待超时时记录错误日志
logger.error(f"等待职位卡片元素超时: {e}")
# 直接返回
return
+
# 使用数据库会话上下文管理器
+ with get_db_session() as session:
# 获取所有职位卡片元素
+ cards = self.tab.eles('.job-card-box')
# 遍历每个职位卡片
+ for card in cards:
+ # 点击职位卡片
+ card.click()
+ # 等待1秒
+ self.browser.wait(1)
+ # 获取职位详情
+ job_data = self.get_job_details(card)
+ # 如果获取到职位数据,则添加到数据库会话中
+ if job_data:
+ session.add(Job(**job_data))
+ # 等待1秒
+ self.tab.wait(1)
# 定义运行爬虫的函数
def run_spider():
try:
# 创建BossSpider实例
spider = BossSpider()
# 调用爬取方法
spider.crawl_jobs()
except Exception as e:
# 捕获异常并记录错误日志
logger.error(f"爬虫运行失败: {e}")
# 重新抛出异常
raise
# 判断是否为主程序入口
if __name__ == '__main__':
# 运行爬虫
run_spider()7.分页处理 #
7.1. spider.py #
spider.py
# 导入日志模块,用于记录程序运行过程中的各种信息
import logging
# 导入操作系统相关模块,用于获取环境变量和系统路径
import os
# 导入pprint用于美观打印数据,便于调试时查看复杂数据结构
from pprint import pprint
# 导入类型提示相关类型,用于函数参数和返回值的类型注解
from typing import Optional, Dict
# 从DrissionPage库导入ChromiumOptions和Chromium类,用于控制Chrome浏览器
from DrissionPage import ChromiumOptions, Chromium
# 从SQLAlchemy导入数据库相关模块,用于数据库连接和表结构定义
from sqlalchemy import create_engine, Column, Integer, String, Text
# 从SQLAlchemy ORM导入声明基类和会话工厂,用于ORM操作
from sqlalchemy.orm import declarative_base,sessionmaker
# 导入上下文管理器,用于自动管理资源的获取和释放
from contextlib import contextmanager
# 导入环境变量加载模块,用于从.env文件加载配置
from dotenv import load_dotenv
# 加载环境变量文件,读取.env文件中的配置信息
load_dotenv()
# 配置日志输出格式和级别,设置日志的时间格式、级别和输出格式
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
# 定义MySQL数据库连接配置字典,从环境变量获取数据库连接参数
MYSQL_CONF = {
'user': os.getenv('MYSQL_USER', 'root'),
'password': os.getenv('MYSQL_PASSWORD', '123456'),
'host': os.getenv('MYSQL_HOST', 'localhost'),
'port': int(os.getenv('MYSQL_PORT', 3306)),
'db': os.getenv('MYSQL_DB', 'jobs')
}
# 创建SQLAlchemy声明基类,所有数据模型类都需要继承这个基类
Base = declarative_base()
# 定义职位信息数据模型类,对应数据库中的jobs表
class Job(Base):
"""BOSS直聘职位信息表"""
# 指定数据库表名,定义该模型对应的数据库表名称
__tablename__ = 'jobs'
# 主键字段,自增的整数类型主键
id = Column(Integer, primary_key=True, autoincrement=True, comment='主键ID')
# 职位基本信息字段,存储职位名称、公司名称和工作城市
job_name = Column(String(128), nullable=True, comment='职位名称')
company = Column(String(128), nullable=True, comment='公司名称')
city = Column(String(64), nullable=True, comment='工作城市')
# 职位要求字段,存储工作经验要求和学历要求
experience = Column(String(64), nullable=True, comment='工作经验要求')
education = Column(String(64), nullable=True, comment='学历要求')
# 详细描述字段,存储完整的职位描述信息
description = Column(Text, nullable=True, comment='职位描述')
# 定义对象的字符串表示方法,用于调试时显示对象信息
def __repr__(self):
"""返回职位的字符串表示"""
return f"<Job(position='{self.position}', company='{self.company}', city='{self.city}')>"
# 创建数据库引擎对象,建立与MySQL数据库的连接
engine = create_engine(
f"mysql+pymysql://{MYSQL_CONF['user']}:{MYSQL_CONF['password']}"
f"@{MYSQL_CONF['host']}:{MYSQL_CONF['port']}/{MYSQL_CONF['db']}?charset=utf8mb4"
)
# 创建数据库会话工厂,用于生成数据库会话对象
Session = sessionmaker(bind=engine)
# 定义数据库会话上下文管理器装饰器,自动管理数据库会话的生命周期
@contextmanager
def get_db_session():
"""数据库会话上下文管理器"""
# 创建数据库会话对象
session = Session()
try:
# 返回会话对象给调用者使用
yield session
# 如果没有异常发生,提交事务保存数据
session.commit()
except Exception as e:
# 发生异常时回滚事务,撤销未提交的更改
session.rollback()
# 重新抛出异常,让上层代码处理
raise
finally:
# 确保会话被关闭,释放数据库连接资源
session.close()
# 获取以当前模块名为名称的logger对象,用于记录当前模块的日志信息
logger = logging.getLogger(__name__)
# 从环境变量获取Chrome浏览器路径,若未设置则使用默认路径
CHROME_PATH = os.getenv('CHROME_PATH', r'C:\Program Files\Google\Chrome\Application\chrome.exe')
# 从环境变量获取搜索关键词,若未设置则默认为'AI前端'
KEYWORD = os.getenv('SEARCH_KEYWORD', 'AI前端')
# 定义BossSpider爬虫类,负责爬取BOSS直聘网站的职位信息
class BossSpider:
# 初始化方法,设置爬虫的基本配置
def __init__(self):
# 保存Chrome浏览器路径到实例变量
self.chrome_path = CHROME_PATH
# 调用浏览器初始化方法
self._setup_browser()
# 浏览器初始化方法,配置并启动Chrome浏览器
def _setup_browser(self):
try:
# 设置浏览器路径并保存配置到DrissionPage
ChromiumOptions().set_browser_path(self.chrome_path).save()
# 创建Chromium浏览器对象
self.browser = Chromium()
# 获取最新的标签页对象,用于后续的页面操作
self.tab = self.browser.latest_tab
except Exception as e:
# 浏览器初始化失败时记录错误日志
logger.error(f"浏览器初始化失败: {e}")
# 抛出异常,终止程序执行
raise
# 获取职位详情的方法,从职位卡片中提取详细信息
def get_job_details(self, card) -> Optional[Dict]:
try:
# 获取职位名称,通过CSS选择器定位元素并提取文本
job_name = card.ele('.job-name').text
# 获取公司名称,使用复合选择器定位boss信息区域
company = card.ele('@|class=boss-info@|class=boss-name').text
# 获取城市信息,从公司位置元素中提取
city = card.ele('.company-location').text
# 获取职位标签列表容器元素
job_info = card.ele('.tag-list')
# 获取所有标签元素,这些标签包含经验和学历要求
job_infos = job_info.eles('tag:li')
# 获取工作经验要求,如果存在标签则取第一个
experience = job_infos[0].text if len(job_infos) > 0 else ''
# 获取学历要求,如果存在第二个标签则取第二个
education = job_infos[1].text if len(job_infos) > 1 else ''
# 等待监听到职位详情接口的响应,获取API数据
detail_result = self.tab.listen.wait()
# 获取接口响应的body内容,包含职位详细信息
detail_data = detail_result.response.body
# 从响应数据中提取岗位描述信息,使用嵌套字典访问
post_description = detail_data.get('zpData', {}).get('jobInfo', {}).get('postDescription', '')
# 返回职位详情字典,包含所有提取的信息
return {
'job_name': job_name,
'company': company,
'city': city,
'experience': experience,
'education': education,
'description': post_description
}
except Exception as e:
# 获取职位详情失败时记录错误日志
logger.error(f"获取职位详情失败: {e}")
# 返回None表示获取失败
return None
# 爬取职位信息的主要方法,负责整个爬取流程
def crawl_jobs(self):
# 创建数据库表结构,如果表不存在则创建
Base.metadata.create_all(engine)
# 监听职位详情接口,准备捕获API响应
self.tab.listen.start('wapi/zpgeek/job/detail.json')
# 记录开始爬取的日志信息
logger.info("开始爬取职位信息")
# 打开职位搜索页面,使用关键词进行搜索
self.tab.get(f'https://www.zhipin.com/web/geek/jobs?query={KEYWORD}')
# 打印标签页对象信息,用于调试
pprint(self.tab)
try:
# 等待职位卡片元素加载完成,超时时间10秒
self.tab.wait.eles_loaded('.job-card-box', timeout=10)
except Exception as e:
# 等待超时时记录错误日志
logger.error(f"等待职位卡片元素超时: {e}")
# 直接返回
return
+ processed_count = 0 # 已处理的元素数量
# 使用数据库会话上下文管理器
with get_db_session() as session:
+ while True:
+ # 获取所有职位卡片元素
+ cards = self.tab.eles('.job-card-box')
+ current_count = len(cards)
+ if current_count <= processed_count:
+ logger.info(f"已加载完所有职位,共 {current_count} 个")
+ break
+ # 遍历每个职位卡片
+ for card in cards[processed_count:]:
# 点击职位卡片,触发详情页面加载
card.click()
# 等待1秒,让页面有时间响应点击事件
self.browser.wait(1)
# 获取职位详情
job_data = self.get_job_details(card)
# 如果获取到职位数据,则添加到数据库会话中
if job_data:
session.add(Job(**job_data))
# 等待1秒
self.tab.wait(1)
+ processed_count = current_count
+ print(f"已处理 {processed_count} 个职位")
+ self.tab.scroll.to_bottom()
+ self.tab.wait(2)
# 定义运行爬虫的函数
def run_spider():
try:
# 创建BossSpider实例
spider = BossSpider()
# 调用爬取方法
spider.crawl_jobs()
except Exception as e:
# 捕获异常并记录错误日志
logger.error(f"爬虫运行失败: {e}")
# 重新抛出异常
raise
# 判断是否为主程序入口
if __name__ == '__main__':
# 运行爬虫
run_spider()8.定时任务 #
pip install schedule8.1. scheduler.py #
scheduler.py
# 导入定时任务调度库,用于设置定时执行任务
import schedule
# 导入时间模块并重命名为t,用于控制程序休眠时间
import time as t
# 导入操作系统模块,用于获取环境变量
import os
# 从dotenv库导入环境变量加载函数
from dotenv import load_dotenv
# 从spider模块导入爬虫运行函数
from spider import run_spider
# 加载.env文件中的环境变量到系统环境
load_dotenv()
# 判断是否为主程序入口
if __name__ == '__main__':
# 从环境变量获取定时执行时间,默认为每天05:00
schedule_time = os.getenv('SCHEDULE_TIME', '05:00')
# 设置每天在指定时间执行爬虫任务
schedule.every().day.at(schedule_time).do(run_spider)
# 打印启动信息,显示定时任务的执行时间
print(f"计划任务已启动,每天 {schedule_time} 执行爬虫。")
# 无限循环,持续检查是否有待执行的任务
while True:
# 检查并执行到期的定时任务
schedule.run_pending()
# 程序休眠60秒,避免过度占用CPU资源
t.sleep(60) 9.Web服务 #
pip install flask9.1. server.py #
server.py
# 从Flask框架导入Flask应用类和jsonify函数,用于创建Web应用和返回JSON响应
from flask import Flask, jsonify
# 从database模块导入数据库会话管理、Job模型和数据库初始化函数
from database import get_db_session, Job
# 导入json模块,用于处理JSON数据
import json
# 导入Response类,用于创建HTTP响应
from flask import Response
# 导入日志模块,用于记录程序运行过程中的各种信息
import logging
# 配置日志输出格式和级别,设置日志的时间格式、级别和输出格式
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
# 获取以当前模块名为名称的logger对象,用于记录当前模块的日志信息
logger = logging.getLogger(__name__)
# 创建Flask应用实例,用于处理HTTP请求和响应
app = Flask(__name__)
# 定义GET请求路由,用于获取所有职位信息
@app.route('/api/jobs', methods=['GET'])
def get_jobs():
try:
# 使用数据库会话上下文管理器,自动管理数据库连接的获取和释放
with get_db_session() as session:
# 查询数据库中的所有职位记录
jobs = session.query(Job).all()
# 创建空列表用于存储格式化后的职位数据
jobs_list = []
# 遍历每个职位对象,将其转换为字典格式
for job in jobs:
jobs_list.append({
'id': job.id,
'job_name': job.job_name,
'company': job.company,
'city': job.city,
'experience': job.experience,
'education': job.education,
'description': job.description
})
# 记录成功获取职位信息的日志
logger.info(f"成功获取 {len(jobs_list)} 个职位信息")
# 返回JSON格式的成功响应,包含职位数据和总数
# return jsonify({
# 'success': True,
# 'data': jobs_list,
# 'total': len(jobs_list)
# })
return Response(
json.dumps(jobs_list, ensure_ascii=False),
mimetype='application/json'
)
except Exception as e:
# 捕获异常并记录错误日志
logger.error(f"获取职位列表失败: {e}")
# 返回JSON格式的错误响应,状态码为500
return jsonify({
'success': False,
'error': str(e)
}), 500
# 判断是否为主程序入口
if __name__ == '__main__':
# 记录启动服务的日志信息
logger.info("启动Flask服务...")
# 启动Flask应用,监听所有网络接口的5000端口,开启调试模式
app.run(host='0.0.0.0', port=5000, debug=True)
9.2. database.py #
database.py
# 从SQLAlchemy导入数据库引擎创建、列定义、整数类型、字符串类型和文本类型
from sqlalchemy import create_engine, Column, Integer, String, Text
# 从SQLAlchemy ORM导入声明基类和会话工厂
from sqlalchemy.orm import declarative_base, sessionmaker
# 导入上下文管理器,用于自动管理资源的获取和释放
from contextlib import contextmanager
# 导入操作系统模块,用于获取环境变量
import os
# 从dotenv库导入环境变量加载函数
from dotenv import load_dotenv
# 加载.env文件中的环境变量到系统环境
load_dotenv()
# 定义MySQL数据库连接配置字典,从环境变量获取配置信息
MYSQL_CONF = {
# 数据库用户名,默认为root
'user': os.getenv('MYSQL_USER', 'root'),
# 数据库密码,默认为123456
'password': os.getenv('MYSQL_PASSWORD', '123456'),
# 数据库主机地址,默认为localhost
'host': os.getenv('MYSQL_HOST', 'localhost'),
# 数据库端口号,默认为3306
'port': int(os.getenv('MYSQL_PORT', 3306)),
# 数据库名称,默认为jobs
'db': os.getenv('MYSQL_DB', 'jobs')
}
# 创建SQLAlchemy声明基类,用于定义数据库表模型
Base = declarative_base()
# 定义Job职位信息表模型类,继承自Base
class Job(Base):
# 指定数据库表名为jobs
__tablename__ = 'jobs'
# 定义主键ID字段,整数类型,自动递增
id = Column(Integer, primary_key=True, autoincrement=True, comment='主键ID')
# 定义职位名称字段,字符串类型,最大长度128,可为空
job_name = Column(String(128), nullable=True, comment='职位名称')
# 定义公司名称字段,字符串类型,最大长度128,可为空
company = Column(String(128), nullable=True, comment='公司名称')
# 定义工作城市字段,字符串类型,最大长度64,可为空
city = Column(String(64), nullable=True, comment='工作城市')
# 定义工作经验要求字段,字符串类型,最大长度64,可为空
experience = Column(String(64), nullable=True, comment='工作经验要求')
# 定义学历要求字段,字符串类型,最大长度64,可为空
education = Column(String(64), nullable=True, comment='学历要求')
# 定义职位描述字段,文本类型,可为空
description = Column(Text, nullable=True, comment='职位描述')
# 定义对象的字符串表示方法,用于调试时显示对象信息
def __repr__(self):
return f"<Job(job_name='{self.job_name}', company='{self.company}', city='{self.city}')>"
# 创建数据库引擎,使用MySQL连接配置构建连接字符串
engine = create_engine(
# 构建MySQL连接URL,包含用户名、密码、主机、端口、数据库名和字符集
f"mysql+pymysql://{MYSQL_CONF['user']}:{MYSQL_CONF['password']}"
f"@{MYSQL_CONF['host']}:{MYSQL_CONF['port']}/{MYSQL_CONF['db']}?charset=utf8mb4"
)
# 根据模型定义创建数据库表
Base.metadata.create_all(engine)
# 创建会话工厂,绑定到数据库引擎
Session = sessionmaker(bind=engine)
# 定义数据库会话上下文管理器,自动管理会话的创建、提交、回滚和关闭
@contextmanager
def get_db_session():
# 创建新的数据库会话
session = Session()
try:
# 返回会话对象给调用者使用
yield session
# 提交事务,保存所有更改
session.commit()
except Exception as e:
# 发生异常时回滚事务,撤销所有更改
session.rollback()
# 重新抛出异常
raise
finally:
# 无论是否发生异常,都要关闭会话
session.close()
9.3. spider.py #
spider.py
# 导入日志模块,用于记录程序运行过程中的各种信息
import logging
# 导入操作系统相关模块,用于获取环境变量和系统路径
import os
# 导入pprint用于美观打印数据,便于调试时查看复杂数据结构
from pprint import pprint
# 导入类型提示相关类型,用于函数参数和返回值的类型注解
from typing import Optional, Dict
# 从DrissionPage库导入ChromiumOptions和Chromium类,用于控制Chrome浏览器
from DrissionPage import ChromiumOptions, Chromium
# 导入环境变量加载模块,用于从.env文件加载配置
from dotenv import load_dotenv
# 加载环境变量文件,读取.env文件中的配置信息
load_dotenv()
# 从database.py导入get_db_session、Job
+from database import get_db_session, Job
# 配置日志输出格式和级别,设置日志的时间格式、级别和输出格式
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
# 获取以当前模块名为名称的logger对象,用于记录当前模块的日志信息
logger = logging.getLogger(__name__)
# 从环境变量获取Chrome浏览器路径,若未设置则使用默认路径
CHROME_PATH = os.getenv('CHROME_PATH', r'C:\Program Files\Google\Chrome\Application\chrome.exe')
# 从环境变量获取搜索关键词,若未设置则默认为'AI前端'
KEYWORD = os.getenv('SEARCH_KEYWORD', 'AI前端')
# 定义BossSpider爬虫类,负责爬取BOSS直聘网站的职位信息
class BossSpider:
# 初始化方法,设置爬虫的基本配置
def __init__(self):
# 保存Chrome浏览器路径到实例变量
self.chrome_path = CHROME_PATH
# 调用浏览器初始化方法
self._setup_browser()
# 浏览器初始化方法,配置并启动Chrome浏览器
def _setup_browser(self):
try:
# 设置浏览器路径并保存配置到DrissionPage
ChromiumOptions().set_browser_path(self.chrome_path).save()
# 创建Chromium浏览器对象
self.browser = Chromium()
# 获取最新的标签页对象,用于后续的页面操作
self.tab = self.browser.latest_tab
except Exception as e:
# 浏览器初始化失败时记录错误日志
logger.error(f"浏览器初始化失败: {e}")
# 抛出异常,终止程序执行
raise
# 获取职位详情的方法,从职位卡片中提取详细信息
def get_job_details(self, card) -> Optional[Dict]:
try:
# 获取职位名称,通过CSS选择器定位元素并提取文本
job_name = card.ele('.job-name').text
# 获取公司名称,使用复合选择器定位boss信息区域
company = card.ele('@|class=boss-info@|class=boss-name').text
# 获取城市信息,从公司位置元素中提取
city = card.ele('.company-location').text
# 获取职位标签列表容器元素
job_info = card.ele('.tag-list')
# 获取所有标签元素,这些标签包含经验和学历要求
job_infos = job_info.eles('tag:li')
# 获取工作经验要求,如果存在标签则取第一个
experience = job_infos[0].text if len(job_infos) > 0 else ''
# 获取学历要求,如果存在第二个标签则取第二个
education = job_infos[1].text if len(job_infos) > 1 else ''
# 等待监听到职位详情接口的响应,获取API数据
detail_result = self.tab.listen.wait()
# 点击职位卡片,触发详情页面加载
card.click()
# 等待1秒,让页面有时间响应点击事件
self.browser.wait(1)
# 获取接口响应的body内容,包含职位详细信息
detail_data = detail_result.response.body
# 从响应数据中提取岗位描述信息,使用嵌套字典访问
post_description = detail_data.get('zpData', {}).get('jobInfo', {}).get('postDescription', '')
# 返回职位详情字典,包含所有提取的信息
return {
'job_name': job_name,
'company': company,
'city': city,
'experience': experience,
'education': education,
'description': post_description
}
except Exception as e:
# 获取职位详情失败时记录错误日志
logger.error(f"获取职位详情失败: {e}")
# 返回None表示获取失败
return None
# 爬取职位信息的主要方法,负责整个爬取流程
def crawl_jobs(self):
# 记录开始爬取的日志信息
logger.info("开始爬取职位信息")
# 打开职位搜索页面,使用关键词进行搜索
self.tab.get(f'https://www.zhipin.com/web/geek/jobs?query={KEYWORD}')
# 打印标签页对象信息,用于调试
pprint(self.tab)
try:
# 等待职位卡片元素加载完成,超时时间10秒
self.tab.wait.eles_loaded('.job-card-box', timeout=10)
except Exception as e:
# 等待超时时记录错误日志
logger.error(f"等待职位卡片元素超时: {e}")
# 直接返回
return
processed_count = 0 # 已处理的元素数量
# 使用数据库会话上下文管理器
with get_db_session() as session:
while True:
# 获取所有职位卡片元素
cards = self.tab.eles('.job-card-box')
current_count = len(cards)
if current_count <= processed_count:
logger.info(f"已加载完所有职位,共 {current_count} 个")
break
# 遍历每个职位卡片
for card in cards[processed_count:]:
# 获取职位详情
job_data = self.get_job_details(card)
# 如果获取到职位数据,则添加到数据库会话中
if job_data:
session.add(Job(**job_data))
# 等待1秒
self.tab.wait(1)
processed_count = current_count
print(f"已处理 {processed_count} 个职位")
self.tab.scroll.to_bottom()
self.tab.wait(2)
# 定义运行爬虫的函数
def run_spider():
try:
# 创建BossSpider实例
spider = BossSpider()
# 调用爬取方法
spider.crawl_jobs()
except Exception as e:
# 捕获异常并记录错误日志
logger.error(f"爬虫运行失败: {e}")
# 重新抛出异常
raise
# 判断是否为主程序入口
if __name__ == '__main__':
# 运行爬虫
run_spider()10.上传简历 #
pip install pdfplumber python-docx10.1. job.html #
templates/job.html
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<title>简历上传</title>
</head>
<body>
<h2>上传简历(仅支持PDF和Word)</h2>
<form id="uploadForm" enctype="multipart/form-data">
<input type="file" name="resume" accept=".pdf,.docx" required />
<button type="submit">上传</button>
</form>
<h3>简历内容:</h3>
<pre id="resumeText"></pre>
<script>
document.getElementById('uploadForm').onsubmit = async function (e) {
e.preventDefault();
const formData = new FormData(this);
const res = await fetch('/job/upload', {
method: 'POST',
body: formData
});
const data = await res.json();
if (data.success) {
document.getElementById('resumeText').textContent = data.text;
} else {
document.getElementById('resumeText').textContent = '解析失败:' + data.error;
}
}
</script>
</body>
</html>10.2. server.py #
server.py
# 导入uuid模块,用于生成唯一文件名
+import uuid
# 从Flask框架导入Flask、jsonify、request、render_template
+from flask import Flask, jsonify, request, render_template
# 从database模块导入数据库会话管理器和Job模型
from database import get_db_session, Job
# 导入json模块,用于处理JSON数据
import json
# 从Flask框架导入Response类,用于自定义HTTP响应
from flask import Response
# 导入logging模块,用于日志记录
import logging
# 导入os模块,用于文件和路径操作
+import os
# 导入pdfplumber库,用于解析pdf文档
+import pdfplumber
# 导入Document类,用于解析docx文档
+from docx import Document
# 配置日志输出格式和日志级别
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
# 获取当前模块的logger对象
logger = logging.getLogger(__name__)
# 创建Flask应用实例
app = Flask(__name__)
# 设置上传文件保存目录
+app.config['UPLOAD_FOLDER'] = 'uploads'
# 设置最大上传文件大小为10MB
+app.config['MAX_CONTENT_LENGTH'] = 10 * 1024 * 1024
# 允许上传的文件扩展名集合
+ALLOWED_EXTENSIONS = {'pdf', 'docx'}
+
# 如果上传目录不存在则创建
+if not os.path.exists(app.config['UPLOAD_FOLDER']):
+ os.makedirs(app.config['UPLOAD_FOLDER'])
+
# 判断文件扩展名是否允许上传
def allowed_file(ext):
return ext in ALLOWED_EXTENSIONS
# 定义GET请求路由,返回职位上传页面
+@app.route('/job', methods=['GET'])
+def job_upload_page():
+ return render_template('job.html')
+
# 定义POST请求路由,处理简历文件上传
+@app.route('/job/upload', methods=['POST'])
+def upload_resume():
# 检查请求中是否包含文件
+ if 'resume' not in request.files:
+ return jsonify({'success': False, 'error': '未选择文件'})
# 获取上传的文件对象
+ file = request.files['resume']
# 检查文件名是否为空
+ if file.filename == '':
+ return jsonify({'success': False, 'error': '未选择文件'})
+ # 获取文件扩展名并转为小写
+ ext = os.path.splitext(file.filename)[1].lower().lstrip(".")
# 检查文件类型是否合法
+ if not allowed_file(ext):
+ return jsonify({'success': False, 'error': '仅支持pdf和docx文件'})
# 生成唯一文件名
+ filename = f"{uuid.uuid4().hex}.{ext}"
# 拼接保存路径
+ save_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
# 保存文件到指定路径
+ file.save(save_path)
+ try:
# 初始化文本内容为空
+ text = ''
# 如果是pdf文件,使用pdfplumber解析
+ if ext == 'pdf':
+ with pdfplumber.open(save_path) as pdf:
+ text = "\n".join(page.extract_text() for page in pdf.pages)
# 如果是docx或doc文件,使用python-docx解析
+ elif ext == 'docx':
+ doc = Document(save_path)
+ text_list = []
+ for para in doc.paragraphs:
+ text_list.append(para.text)
+ text = "\n".join(text_list)
# 其他文件类型不支持
+ else:
+ return jsonify({'success': False, 'error': '文件类型不支持'})
# 返回解析后的文本内容
+ return jsonify({'success': True, 'text': text.strip()})
+ except Exception as e:
# 解析失败时记录错误日志
+ logger.error(f'简历解析失败: {e}')
# 返回错误信息
+ return jsonify({'success': False, 'error': str(e)})
+
# 定义GET请求路由,获取所有职位信息
@app.route('/api/jobs', methods=['GET'])
def get_jobs():
try:
# 使用数据库会话上下文管理器,自动管理数据库连接
with get_db_session() as session:
# 查询所有职位记录
jobs = session.query(Job).all()
# 创建空列表用于存储职位数据
jobs_list = []
# 遍历每个职位对象,转换为字典格式
for job in jobs:
jobs_list.append({
'id': job.id,
'job_name': job.job_name,
'company': job.company,
'city': job.city,
'experience': job.experience,
'education': job.education,
'description': job.description
})
# 记录获取职位信息的日志
logger.info(f"成功获取 {len(jobs_list)} 个职位信息")
# 返回JSON格式的职位数据
return Response(
+ json.dumps(jobs_list, ensure_ascii=False),
+ mimetype='application/json'
)
except Exception as e:
# 捕获异常并记录错误日志
logger.error(f"获取职位列表失败: {e}")
# 返回错误响应,状态码500
return jsonify({
'success': False,
'error': str(e)
}), 500
+
# 判断是否为主程序入口
if __name__ == '__main__':
# 记录服务启动日志
logger.info("启动Flask服务...")
# 启动Flask应用,监听所有IP的5000端口,开启调试模式
app.run(host='0.0.0.0', port=5000, debug=True)
11.匹配分析 #
pip install openai11.1. .env #
.env
# Chrome配置
CHROME_PATH=C:\Program Files\Google\Chrome\Application\chrome.exe
SEARCH_KEYWORD=AI前端
# 数据库配置
MYSQL_HOST=localhost
MYSQL_USER=root
MYSQL_PASSWORD=123456
MYSQL_PORT=3306
MYSQL_DB=jobs
+
# API_KEY
+DEEPSEEK_API_KEY=sk-8146a78d6fdc4028849fbd487c551e0111.2. job.html #
templates/job.html
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<title>简历上传</title>
</head>
<body>
<h2>上传简历(仅支持PDF和Word)</h2>
<form id="uploadForm" enctype="multipart/form-data">
<input type="file" name="resume" accept=".pdf,.docx" required />
<button type="submit">上传</button>
</form>
<h3>简历内容:</h3>
+ <div id="analysisText"></div>
<script>
document.getElementById('uploadForm').onsubmit = async function (e) {
e.preventDefault();
const formData = new FormData(this);
const res = await fetch('/job/upload', {
method: 'POST',
body: formData
});
const data = await res.json();
if (data.success) {
+ const results = data.results;
+ console.log(results);
+ const result = results.map(item => `
+ <div>
+ <h3>${item.job_name}</h3>
+ <p>${item.company}</p>
+ <p>${item.analysis}</p>
+ </div>
+ `).join('');
+ document.getElementById('analysisText').innerHTML = result;
} else {
+ document.getElementById('analysisText').textContent = '分析失败:' + data.error;
}
}
</script>
</body>
</html>11.3. server.py #
server.py
# 导入uuid模块,用于生成唯一文件名
import uuid
# 从Flask框架导入Flask、jsonify、request、render_template
from flask import Flask, jsonify, request, render_template
# 从database模块导入数据库会话管理器和Job模型
from database import get_db_session, Job
# 导入json模块,用于处理JSON数据
import json
# 从Flask框架导入Response类,用于自定义HTTP响应
from flask import Response
# 导入logging模块,用于日志记录
import logging
# 导入os模块,用于文件和路径操作
import os
# 导入Document类,用于解析docx文档
from docx import Document
# 导入pdfplumber库,用于解析pdf文档
import pdfplumber
# 导入openai库,用于调用OpenAI大模型
+from openai import OpenAI
+
# 配置日志输出格式和日志级别
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
# 获取当前模块的logger对象
logger = logging.getLogger(__name__)
# 创建Flask应用实例
app = Flask(__name__)
# 设置上传文件保存目录
app.config['UPLOAD_FOLDER'] = 'uploads'
# 设置最大上传文件大小为10MB
app.config['MAX_CONTENT_LENGTH'] = 10 * 1024 * 1024
# 允许上传的文件扩展名集合
ALLOWED_EXTENSIONS = {'pdf', 'docx'}
# 如果上传目录不存在则创建
if not os.path.exists(app.config['UPLOAD_FOLDER']):
os.makedirs(app.config['UPLOAD_FOLDER'])
# 判断文件扩展名是否允许上传
+def allowed_file(ext):
+ return ext in ALLOWED_EXTENSIONS
# 定义GET请求路由,返回职位上传页面
@app.route('/job', methods=['GET'])
def job_upload_page():
return render_template('job.html')
# 创建OpenAI客户端实例,配置API密钥和基础URL
+client = OpenAI(
+ api_key=os.getenv('DEEPSEEK_API_KEY', ''),
+ base_url="https://api.deepseek.com/v1"
+)
+
# 定义大模型分析提示词模板,用于简历与职位匹配度分析
+PROMPT_TEMPLATE = '''你是一个AI前端招聘的技术专家
+请根据下面职位描述和简历的内容,给出是否匹配的明确结论,匹配或者不匹配
+职位信息如下:{job_desc}
+简历内容如下:{resume_text}
+请按以下格式输出分析结果:
+1. 是否符合职位要求(是/否),并简要说明原因
+2. 技术能力分析:
+ - 已具备的技术点
+ - 欠缺的技术点
+3. 改进建议:
+ - 具体的学习路径
+ - 推荐的学习资源
+ - 预计达到要求的时间周期
+注意:请只关注技术能力方面的匹配度,忽略城市、学历等其他因素'''
+
# 定义文件内容提取函数,接收文件路径和扩展名参数
+def extract_resume_text(save_path, ext):
# 初始化文本变量为空字符串
+ text = ''
# 判断文件扩展名是否为PDF格式
+ if ext == 'pdf':
# 使用pdfplumber库打开PDF文件
+ with pdfplumber.open(save_path) as pdf:
# 遍历PDF的每一页,提取文本内容并用换行符连接
+ text = "\n".join(page.extract_text() or '' for page in pdf.pages)
# 判断文件扩展名是否为DOCX格式
+ elif ext == 'docx':
# 使用python-docx库打开Word文档
+ doc = Document(save_path)
# 提取文档中所有段落的文本内容到列表中
+ text_list = [para.text for para in doc.paragraphs]
# 将段落文本用换行符连接成完整文本
+ text = "\n".join(text_list)
# 如果文件格式既不是PDF也不是DOCX
+ else:
# 抛出值错误异常,提示文件类型不支持
+ raise ValueError('文件类型不支持')
# 返回去除首尾空白字符的文本内容
+ return text.strip()
+
# 定义大模型分析函数,接收职位描述和简历文本作为参数
+def analyze_with_llm(job_desc, resume_text):
# 使用提示词模板格式化职位描述和简历文本
+ prompt = PROMPT_TEMPLATE.format(job_desc=job_desc, resume_text=resume_text)
# 尝试调用大模型API进行分析
+ try:
# 创建聊天完成请求,使用deepseek-chat模型
+ response = client.chat.completions.create(
# 指定使用的模型名称
+ model="deepseek-chat",
# 设置消息格式,包含用户角色和提示内容
+ messages=[
+ {"role": "system", "content": "你是一个"+os.getenv('SEARCH_KEYWORD','AI前端')+"招聘的技术专家"},
+ {"role": "user", "content": prompt}
+ ],
# 禁用流式输出,等待完整响应
+ stream=False
+ )
# 返回模型响应的内容,并去除首尾空白字符
+ return response.choices[0].message.content.strip()
# 捕获并处理可能出现的异常
+ except Exception as e:
# 记录错误日志,包含具体的错误信息
+ logger.error(f'大模型分析失败: {e}')
# 返回错误信息给调用方
+ return f'分析失败: {e}'
+
# 定义POST请求路由,处理简历文件上传
@app.route('/job/upload', methods=['POST'])
def upload_resume():
# 检查请求中是否包含名为'resume'的文件
if 'resume' not in request.files:
return jsonify({'success': False, 'error': '未选择文件'})
# 获取上传的文件对象
file = request.files['resume']
# 检查文件名是否为空,表示用户没有选择文件
if file.filename == '':
return jsonify({'success': False, 'error': '未选择文件'})
# 提取文件扩展名,转换为小写并去除点号
ext = os.path.splitext(file.filename)[1].lower().lstrip('.')
# 检查文件类型是否在允许的格式列表中
if not allowed_file(ext):
return jsonify({'success': False, 'error': '仅支持pdf和docx文件'})
# 生成唯一的文件名,使用UUID避免文件名冲突
filename = f"{uuid.uuid4().hex}.{ext}"
# 构建完整的文件保存路径
save_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
# 将文件保存到指定路径
file.save(save_path)
# 开始异常处理,确保文件操作的安全性
try:
# 调用文本提取函数,从上传的文件中提取文本内容
+ resume_text = extract_resume_text(save_path, ext)
# 使用数据库会话上下文管理器,自动管理数据库连接
+ with get_db_session() as session:
# 查询数据库中的职位信息,限制只获取1条记录
+ jobs = session.query(Job).limit(1).all()
# 初始化结果列表,用于存储分析结果
+ results = []
# 遍历每个职位记录
+ for job in jobs:
+ # 获取职位描述,如果为空则使用空字符串
+ job_desc = job.description or ''
+ # 调用大模型分析函数,分析简历与职位的匹配度
+ analysis = analyze_with_llm(job_desc, resume_text)
+ # 记录分析结果的日志信息
+ logger.info(f'分析结果: {analysis}')
+ # 将分析结果添加到结果列表中
+ results.append({
+ 'job_id': job.id,
+ 'job_name': job.job_name,
+ 'company': job.company,
+ 'analysis': analysis
+ })
# 返回成功响应,包含分析结果
+ return jsonify({'success': True, 'results': results})
# 捕获并处理可能出现的异常
except Exception as e:
# 记录错误日志,包含具体的错误信息
+ logger.error(f'简历解析或分析失败: {e}')
# 返回错误响应,包含错误信息
return jsonify({'success': False, 'error': str(e)})
# 定义GET请求路由,获取所有职位信息
@app.route('/api/jobs', methods=['GET'])
def get_jobs():
try:
# 使用数据库会话上下文管理器,自动管理数据库连接
with get_db_session() as session:
# 查询所有职位记录
jobs = session.query(Job).all()
# 创建空列表用于存储职位数据
jobs_list = []
# 遍历每个职位对象,转换为字典格式
for job in jobs:
jobs_list.append({
'id': job.id,
'job_name': job.job_name,
'company': job.company,
'city': job.city,
'experience': job.experience,
'education': job.education,
'description': job.description
})
# 记录获取职位信息的日志
logger.info(f"成功获取 {len(jobs_list)} 个职位信息")
# 返回JSON格式的职位数据
return Response(
json.dumps(jobs_list, ensure_ascii=False),
mimetype='application/json'
)
except Exception as e:
# 捕获异常并记录错误日志
logger.error(f"获取职位列表失败: {e}")
# 返回错误响应,状态码500
return jsonify({
'success': False,
'error': str(e)
}), 500
# 判断是否为主程序入口
if __name__ == '__main__':
# 记录服务启动日志
logger.info("启动Flask服务...")
# 启动Flask应用,监听所有IP的5000端口,开启调试模式
app.run(host='0.0.0.0', port=5000, debug=True)
12.markdown #
12.1. job.html #
templates/job.html
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<title>简历上传</title>
+ <script src="https://cdn.jsdelivr.net/npm/marked/marked.min.js"></script>
</head>
<body>
<h2>上传简历(仅支持PDF和Word)</h2>
<form id="uploadForm" enctype="multipart/form-data">
<input type="file" name="resume" accept=".pdf,.docx" required />
<button type="submit">上传</button>
</form>
<h3>简历内容:</h3>
<div id="analysisText"></div>
<script>
document.getElementById('uploadForm').onsubmit = async function (e) {
e.preventDefault();
const formData = new FormData(this);
const res = await fetch('/job/upload', {
method: 'POST',
body: formData
});
const data = await res.json();
if (data.success) {
const results = data.results;
const result = results.map(item => `
<div>
<h3>${item.job_name}</h3>
<p>${item.company}</p>
+ <div>${marked.parse(item.analysis)}</div>
</div>
`).join('');
document.getElementById('analysisText').innerHTML = result;
} else {
document.getElementById('analysisText').textContent = '分析失败:' + data.error;
}
}
</script>
</body>
</html>13.美化输出 #
13.1. job.html #
templates/job.html
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<title>简历上传</title>
+ <link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/github-markdown-css@5.5.1/github-markdown.min.css">
<script src="https://cdn.jsdelivr.net/npm/marked/marked.min.js"></script>
</head>
<body>
<h2>上传简历(仅支持PDF和Word)</h2>
<form id="uploadForm" enctype="multipart/form-data">
<input type="file" name="resume" accept=".pdf,.docx" required />
<button type="submit">上传</button>
</form>
<h3>简历内容:</h3>
<div id="analysisText"></div>
<script>
document.getElementById('uploadForm').onsubmit = async function (e) {
e.preventDefault();
const formData = new FormData(this);
const res = await fetch('/job/upload', {
method: 'POST',
body: formData
});
const data = await res.json();
if (data.success) {
const results = data.results;
const result = results.map(item => `
<div>
<h3>${item.job_name}</h3>
<p>${item.company}</p>
+ <div class="analysis-content markdown-body">${marked.parse(item.analysis)}</div>
</div>
`).join('');
document.getElementById('analysisText').innerHTML = result;
} else {
document.getElementById('analysisText').textContent = '分析失败:' + data.error;
}
}
</script>
</body>
</html>14.流式输出 #
14.1. job.html #
templates/job.html
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<title>简历上传</title>
<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/github-markdown-css@5.5.1/github-markdown.min.css">
<script src="https://cdn.jsdelivr.net/npm/marked/marked.min.js"></script>
</head>
<body>
<h2>上传简历(仅支持PDF和Word)</h2>
<form id="uploadForm" enctype="multipart/form-data">
<input type="file" name="resume" accept=".pdf,.docx" required />
<button type="submit">上传</button>
</form>
<h3>简历内容:</h3>
+ <div id="analysisText" style="padding: 50px;"></div>
<script>
+ // 定义滚动到底部的函数
function scrollToBottom() {
+ // 获取分析文本容器元素
const analysisDiv = document.getElementById('analysisText');
+ // 将容器的滚动位置设置到底部
analysisDiv.scrollTop = analysisDiv.scrollHeight;
+ // 平滑滚动整个页面到底部
window.scrollTo({ top: document.body.scrollHeight, behavior: 'smooth' });
}
+
+ // 为上传表单添加提交事件监听器
document.getElementById('uploadForm').onsubmit = async function (e) {
+ // 阻止表单默认提交行为
e.preventDefault();
+ // 创建FormData对象来收集表单数据
const formData = new FormData(this);
+ // 清空分析文本容器
document.getElementById('analysisText').innerHTML = '';
+ // 发送POST请求到流式上传接口
const res = await fetch('/job/upload/stream', {
method: 'POST',
body: formData
});
+ // 获取响应体的读取器
const reader = res.body.getReader();
+ // 创建文本解码器
const decoder = new TextDecoder();
+ // 存储每个职位分析内容的映射对象
let jobMap = {};
+ // 当前正在处理的职位ID
let currentJobId = null;
+ // 当前正在处理的职位div元素
let currentJobDiv = null;
+
+ // 循环读取流式数据
while (true) {
+ // 读取下一个数据块
const { done, value } = await reader.read();
+ // 如果读取完成则退出循环
if (done) break;
+ // 解码数据块为文本
const chunk = decoder.decode(value);
+ // 按行分割数据
const lines = chunk.split('\n');
+
+ // 遍历每一行数据
for (const line of lines) {
+ // 检查是否是数据行(以'data: '开头)
if (line.startsWith('data: ')) {
+ // 解析JSON数据
const data = JSON.parse(line.slice(6));
+
+ // 处理开始分析的消息
if (data.type === 'start') {
+ // 设置当前职位ID
currentJobId = data.job_id;
+ // 创建新的职位div元素
currentJobDiv = document.createElement('div');
+ // 设置职位div的HTML内容
currentJobDiv.innerHTML = `
<h3>${data.job_name}</h3>
<p>${data.company}</p>
<div class="analysis-content markdown-body" id="analysis-${currentJobId}"></div>
`;
+ // 将职位div添加到分析文本容器中
document.getElementById('analysisText').appendChild(currentJobDiv);
+ // 初始化该职位的分析内容
jobMap[currentJobId] = '';
+ // 滚动到底部
scrollToBottom();
+ }
+ // 处理内容更新的消息
+ else if (data.type === 'content') {
+ // 累加该职位的分析内容
jobMap[data.job_id] += data.content;
+ // 将Markdown内容转换为HTML并显示
document.getElementById(`analysis-${data.job_id}`).innerHTML = marked.parse(jobMap[data.job_id]);
+ // 滚动到底部
scrollToBottom();
+ }
+ // 处理错误消息
+ else if (data.type === 'error') {
+ // 在分析文本容器中添加错误信息
document.getElementById('analysisText').innerHTML += `<div style="color:red;">分析失败:${data.error}</div>`;
+ // 滚动到底部
scrollToBottom();
}
}
}
}
}
</script>
</body>
</html>14.2. server.py #
server.py
# 导入uuid模块,用于生成唯一文件名
import uuid
# 从Flask框架导入Flask、jsonify、request、render_template
from flask import Flask, jsonify, request, render_template, stream_with_context
# 从database模块导入数据库会话管理器和Job模型
from database import get_db_session, Job
# 导入json模块,用于处理JSON数据
import json
# 从Flask框架导入Response类,用于自定义HTTP响应
from flask import Response
# 导入logging模块,用于日志记录
import logging
# 导入os模块,用于文件和路径操作
import os
# 导入Document类,用于解析docx文档
from docx import Document
# 导入pdfplumber库,用于解析pdf文档
import pdfplumber
# 导入openai库,用于调用OpenAI大模型
from openai import OpenAI
# 配置日志输出格式和日志级别
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
# 获取当前模块的logger对象
logger = logging.getLogger(__name__)
# 创建Flask应用实例
app = Flask(__name__)
# 设置上传文件保存目录
app.config['UPLOAD_FOLDER'] = 'uploads'
# 设置最大上传文件大小为10MB
app.config['MAX_CONTENT_LENGTH'] = 10 * 1024 * 1024
# 允许上传的文件扩展名集合
ALLOWED_EXTENSIONS = {'pdf', 'docx'}
# 如果上传目录不存在则创建
if not os.path.exists(app.config['UPLOAD_FOLDER']):
os.makedirs(app.config['UPLOAD_FOLDER'])
# 判断文件扩展名是否允许上传
def allowed_file(ext):
return ext in ALLOWED_EXTENSIONS
# 定义GET请求路由,返回职位上传页面
@app.route('/job', methods=['GET'])
def job_upload_page():
return render_template('job.html')
# 创建OpenAI客户端实例,配置API密钥和基础URL
client = OpenAI(
api_key=os.getenv('DEEPSEEK_API_KEY', ''),
base_url="https://api.deepseek.com/v1"
)
# 定义大模型分析提示词模板,用于简历与职位匹配度分析
PROMPT_TEMPLATE = '''你是一个AI前端招聘的技术专家
请根据下面职位描述和简历的内容,给出是否匹配的明确结论,匹配或者不匹配
职位信息如下:{job_desc}
简历内容如下:{resume_text}
请按以下格式输出分析结果:
1. 是否符合职位要求(是/否),并简要说明原因
2. 技术能力分析:
- 已具备的技术点
- 欠缺的技术点
3. 改进建议:
- 具体的学习路径
- 推荐的学习资源
- 预计达到要求的时间周期
注意:请只关注技术能力方面的匹配度,忽略城市、学历等其他因素'''
# 定义文件内容提取函数,接收文件路径和扩展名参数
def extract_resume_text(save_path, ext):
# 初始化文本变量为空字符串
text = ''
# 判断文件扩展名是否为PDF格式
if ext == 'pdf':
# 使用pdfplumber库打开PDF文件
with pdfplumber.open(save_path) as pdf:
# 遍历PDF的每一页,提取文本内容并用换行符连接
text = "\n".join(page.extract_text() or '' for page in pdf.pages)
# 判断文件扩展名是否为DOCX格式
elif ext == 'docx':
# 使用python-docx库打开Word文档
doc = Document(save_path)
# 提取文档中所有段落的文本内容到列表中
text_list = [para.text for para in doc.paragraphs]
# 将段落文本用换行符连接成完整文本
text = "\n".join(text_list)
# 如果文件格式既不是PDF也不是DOCX
else:
# 抛出值错误异常,提示文件类型不支持
raise ValueError('文件类型不支持')
# 返回去除首尾空白字符的文本内容
return text.strip()
# 定义大模型分析函数,接收职位描述和简历文本作为参数
def analyze_with_llm(job_desc, resume_text):
# 使用提示词模板格式化职位描述和简历文本
prompt = PROMPT_TEMPLATE.format(job_desc=job_desc, resume_text=resume_text)
# 尝试调用大模型API进行分析
try:
# 创建聊天完成请求,使用deepseek-chat模型
response = client.chat.completions.create(
# 指定使用的模型名称
model="deepseek-chat",
# 设置消息格式,包含用户角色和提示内容
messages=[
{"role": "system", "content": "你是一个"+os.getenv('SEARCH_KEYWORD','AI前端')+"招聘的技术专家"},
{"role": "user", "content": prompt}
],
# 禁用流式输出,等待完整响应
stream=False
)
# 返回模型响应的内容,并去除首尾空白字符
return response.choices[0].message.content.strip()
# 捕获并处理可能出现的异常
except Exception as e:
# 记录错误日志,包含具体的错误信息
logger.error(f'大模型分析失败: {e}')
# 返回错误信息给调用方
return f'分析失败: {e}'
# 定义POST请求路由,处理简历文件上传
@app.route('/job/upload', methods=['POST'])
def upload_resume():
# 检查请求中是否包含名为'resume'的文件
if 'resume' not in request.files:
return jsonify({'success': False, 'error': '未选择文件'})
# 获取上传的文件对象
file = request.files['resume']
# 检查文件名是否为空,表示用户没有选择文件
if file.filename == '':
return jsonify({'success': False, 'error': '未选择文件'})
# 提取文件扩展名,转换为小写并去除点号
ext = os.path.splitext(file.filename)[1].lower().lstrip('.')
# 检查文件类型是否在允许的格式列表中
if not allowed_file(ext):
return jsonify({'success': False, 'error': '仅支持pdf和docx文件'})
# 生成唯一的文件名,使用UUID避免文件名冲突
filename = f"{uuid.uuid4().hex}.{ext}"
# 构建完整的文件保存路径
save_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
# 将文件保存到指定路径
file.save(save_path)
# 开始异常处理,确保文件操作的安全性
try:
# 调用文本提取函数,从上传的文件中提取文本内容
resume_text = extract_resume_text(save_path, ext)
# 使用数据库会话上下文管理器,自动管理数据库连接
with get_db_session() as session:
# 查询数据库中的职位信息,限制只获取1条记录
jobs = session.query(Job).limit(1).all()
# 初始化结果列表,用于存储分析结果
results = []
# 遍历每个职位记录
for job in jobs:
# 获取职位描述,如果为空则使用空字符串
job_desc = job.description or ''
# 调用大模型分析函数,分析简历与职位的匹配度
analysis = analyze_with_llm(job_desc, resume_text)
# 记录分析结果的日志信息
logger.info(f'分析结果: {analysis}')
# 将分析结果添加到结果列表中
results.append({
'job_id': job.id,
'job_name': job.job_name,
'company': job.company,
'analysis': analysis
})
# 返回成功响应,包含分析结果
return jsonify({'success': True, 'results': results})
# 捕获并处理可能出现的异常
except Exception as e:
# 记录错误日志,包含具体的错误信息
logger.error(f'简历解析或分析失败: {e}')
# 返回错误响应,包含错误信息
return jsonify({'success': False, 'error': str(e)})
# 定义GET请求路由,获取所有职位信息
@app.route('/api/jobs', methods=['GET'])
def get_jobs():
try:
# 使用数据库会话上下文管理器,自动管理数据库连接
with get_db_session() as session:
# 查询所有职位记录
jobs = session.query(Job).all()
# 创建空列表用于存储职位数据
jobs_list = []
# 遍历每个职位对象,转换为字典格式
for job in jobs:
jobs_list.append({
'id': job.id,
'job_name': job.job_name,
'company': job.company,
'city': job.city,
'experience': job.experience,
'education': job.education,
'description': job.description
})
# 记录获取职位信息的日志
logger.info(f"成功获取 {len(jobs_list)} 个职位信息")
# 返回JSON格式的职位数据
return Response(
json.dumps(jobs_list, ensure_ascii=False),
mimetype='application/json'
)
except Exception as e:
# 捕获异常并记录错误日志
logger.error(f"获取职位列表失败: {e}")
# 返回错误响应,状态码500
return jsonify({
'success': False,
'error': str(e)
}), 500
+# 定义流式上传简历的路由,处理POST请求
+@app.route('/job/upload/stream', methods=['POST'])
+def upload_resume_stream():
+ # 检查请求中是否包含简历文件
+ if 'resume' not in request.files:
+ return jsonify({'success': False, 'error': '未选择文件'})
+ # 获取上传的文件对象
+ file = request.files['resume']
+ # 检查文件名是否为空
+ if file.filename == '':
+ return jsonify({'success': False, 'error': '未选择文件'})
+ # 提取文件扩展名并转换为小写,去除点号
+ ext = os.path.splitext(file.filename)[1].lower().lstrip('.')
+ # 验证文件类型是否允许
+ if not allowed_file(ext):
+ return jsonify({'success': False, 'error': '仅支持pdf和docx文件'})
+ # 生成唯一的文件名
+ filename = f"{uuid.uuid4().hex}.{ext}"
+ # 构建完整的文件保存路径
+ save_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
+ # 保存上传的文件到指定路径
+ file.save(save_path)
+ # 定义生成器函数,用于流式输出分析结果
+ def generate():
+ try:
+ # 提取简历文本内容
+ resume_text = extract_resume_text(save_path, ext)
+ # 使用数据库会话上下文管理器
+ with get_db_session() as session:
+ # 查询所有职位信息
+ jobs = session.query(Job).all()
+ # 遍历每个职位进行分析
+ for job in jobs:
+ # 获取职位描述,如果为空则使用空字符串
+ job_desc = job.description or ''
+ # 格式化提示词模板
+ prompt = PROMPT_TEMPLATE.format(job_desc=job_desc, resume_text=resume_text)
+ try:
+ # 调用AI接口进行流式分析
+ response = client.chat.completions.create(
+ model="deepseek-chat",
+ messages=[
+ {"role": "system", "content": "你是一个"+os.getenv('SEARCH_KEYWORD','AI前端')+"招聘的技术专家"},
+ {"role": "user", "content": prompt}
+ ],
+ stream=True
+ )
+ # 发送开始分析的数据流
+ yield f'data: ' + json.dumps({"job_id": job.id, "job_name": job.job_name, "company": job.company, "type": "start"}, ensure_ascii=False) + '\n\n'
+ # 遍历AI响应的每个数据块
+ for chunk in response:
+ # 检查是否有内容输出
+ if chunk.choices[0].delta.content:
+ # 获取内容片段
+ content = chunk.choices[0].delta.content
+ # 发送内容数据流
+ yield f'data: ' + json.dumps({"job_id": job.id, "type": "content", "content": content}, ensure_ascii=False) + '\n\n'
+ # 发送分析完成的数据流
+ yield f'data: ' + json.dumps({"job_id": job.id, "type": "end"}, ensure_ascii=False) + '\n\n'
+ except Exception as e:
+ # 发送错误信息的数据流
+ yield f'data: ' + json.dumps({"job_id": job.id, "type": "error", "error": str(e)}, ensure_ascii=False) + '\n\n'
+ # 发送所有分析完成的数据流
+ yield f'data: ' + json.dumps({"type": "all_complete"}, ensure_ascii=False) + '\n\n'
+ except Exception as e:
+ # 发送整体错误信息的数据流
+ yield f'data: ' + json.dumps({"type": "error", "error": str(e)}, ensure_ascii=False) + '\n\n'
+ # 返回流式响应,设置MIME类型为事件流
+ return app.response_class(stream_with_context(generate()), mimetype='text/event-stream')
# 判断是否为主程序入口
if __name__ == '__main__':
# 记录服务启动日志
logger.info("启动Flask服务...")
# 启动Flask应用,监听所有IP的5000端口,开启调试模式
app.run(host='0.0.0.0', port=5000, debug=True)