ai
  • index
  • cursor
  • vector
  • crawl
  • crawl-front
  • DrissionPage
  • logging
  • mysql
  • pprint
  • sqlalchemy
  • contextmanager
  • dotenv
  • Flask
  • python
  • job
  • pdfplumber
  • python-docx
  • redbook
  • douyin
  • ffmpeg
  • json
  • numpy
  • opencv-python
  • pypinyin
  • re
  • requests
  • subprocess
  • time
  • uuid
  • watermark
  • milvus
  • pymilvus
  • search
  • Blueprint
  • flash
  • Jinja2
  • secure_filename
  • url_for
  • Werkzeug
  • chroma
  • HNSW
  • pillow
  • pandas
  • beautifulsoup4
  • langchain-community
  • langchain-core
  • langchain
  • langchain_unstructured
  • libreoffice
  • lxml
  • openpyxl
  • pymupdf
  • python-pptx
  • RAGFlow
  • tabulate
  • sentence_transformers
  • jsonl
  • collections
  • jieba
  • rag_optimize
  • rag
  • rank_bm25
  • Hugging_Face
  • modelscope
  • all-MiniLM-L6-v2
  • ollama
  • rag_measure
  • ragas
  • ASGI
  • FastAPI
  • FastChat
  • Jupyter
  • PyTorch
  • serper
  • uvicorn
  • markdownify
  • NormalizedLevenshtein
  • raq-action
  • CrossEncoder
  • Bi-Encoder
  • neo4j
  • neo4j4python
  • matplotlib
  • Plotly
  • Streamlit
  • py2neo
  • abc
  • read_csv
  • neo4jinstall
  • APOC
  • neo4jproject
  • uv
  • GDS
  • heapq
  • 1.参考
  • 2.配置VSCode
    • 2.1. 创建虚拟环境
    • 2.2 格式化python代码插件
    • 2.3. 格式化python代码插件
  • 3.初始化爬虫类
    • 3.1 spider.py
  • 4.爬取职位信息
    • 4.1. .env
    • 4.2. spider.py
  • 5.爬取岗位描述
    • 5.1. spider.py
  • 6.保存数据库
    • 6.1. 安装
    • 6.2. .env
    • 6.3. spider.py
  • 7.分页处理
    • 7.1. spider.py
  • 8.定时任务
    • 8.1. scheduler.py
  • 9.Web服务
    • 9.1. server.py
    • 9.2. database.py
    • 9.3. spider.py
  • 10.上传简历
    • 10.1. job.html
    • 10.2. server.py
  • 11.匹配分析
    • 11.1. .env
    • 11.2. job.html
    • 11.3. server.py
  • 12.markdown
    • 12.1. job.html
  • 13.美化输出
    • 13.1. job.html
  • 14.流式输出
    • 14.1. job.html
    • 14.2. server.py

1.参考 #

  • python语法
  • logging
  • DrissionPage
  • mysql
  • sqlalchemy
  • python-dotenv
  • pdfplumber
  • python-docx

2.配置VSCode #

2.1. 创建虚拟环境 #

# 创建虚拟环境
python -m venv .venv
# 激活虚拟环境(Windows)
.venv\Scripts\activate.bat
# 激活虚拟环境(Linux/macOS)
source .venv/Scripts/activate

2.2 格式化python代码插件 #

black-formatter

按下Ctrl+Shift+P 选择 Preferences:OpenUserSettings(JSON)

添加

{
  "[python]": {
    "editor.defaultFormatter": "ms-python.black-formatter",
    "editor.formatOnSave": true
  }
}

2.3. 格式化python代码插件 #

VS Code 的 "Run Code"(由 Code Runner 扩展提供)可能未正确配置编码。

  1. 打开 VS Code 设置(Ctrl + ,),搜索 Code Runner,找到 Code-runner: Executor Map。
  2. 在 settings.json 中添加 Python 执行的编码配置:
    "code-runner.executorMap": {
     "python": "set PYTHONIOENCODING=utf8 && python -u $fullFileName"
    }
    如果想在Run Code时使用当前目录的虚拟环境可以配置
    "code-runner.executorMap": {
     "python": "set PYTHONIOENCODING=utf8 && $workspaceRoot/.venv/Scripts/python.exe -u $fullFileName",
    }
    (Windows 用户用 set,Mac/Linux 用 export)

3.初始化爬虫类 #

pip install DrissionPage

3.1 spider.py #

# 导入logging模块,用于日志记录
import logging
# 配置日志基本设置,日志级别为INFO,日志格式包含时间、级别和消息
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
# 获取以当前模块名为名称的logger对象
logger = logging.getLogger(__name__)
# 定义BossSpider类,负责爬取职位信息
class BossSpider:
    # 定义爬取职位信息的方法
    def crawl_jobs(self):
        # 记录一条INFO级别的日志,表示开始爬取
        logger.info("开始爬取职位信息")
# 定义运行爬虫的函数
def run_spider():
    try:
        # 创建BossSpider实例
        spider = BossSpider()
        # 调用实例的爬取方法
        spider.crawl_jobs()
    except Exception as e:
        # 捕获异常并记录错误日志
        logger.error(f"爬虫运行失败: {e}")
        # 重新抛出异常会保留原始的异常堆栈信息(traceback),这样更方便后续调试和定位问题。
        # raise e:抛出异常对象 e,但会丢失原始异常的堆栈信息
        raise
# 判断是否为主程序入口
if __name__ == '__main__':
    # 运行爬虫
    run_spider()

4.爬取职位信息 #

4.1. .env #

.env

# Chrome配置
CHROME_PATH=C:\Program Files\Google\Chrome\Application\chrome.exe
SEARCH_KEYWORD=AI前端

4.2. spider.py #

spider.py

# 导入日志模块
import logging
# 导入操作系统相关模块
+import os
# 导入pprint用于美观打印数据
+from pprint import pprint
# 导入类型提示相关类型
+from typing import Optional, Dict
# 从DrissionPage库导入ChromiumOptions和Chromium类
+from DrissionPage import ChromiumOptions, Chromium

# 配置日志输出格式和级别
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)

# 获取以当前模块名为名称的logger对象
logger = logging.getLogger(__name__)
# 从环境变量获取Chrome浏览器路径,若未设置则使用默认路径
+CHROME_PATH = os.getenv('CHROME_PATH', r'C:\Program Files\Google\Chrome\Application\chrome.exe')
# 从环境变量获取搜索关键词,若未设置则默认为'AI前端'
+KEYWORD = os.getenv('SEARCH_KEYWORD', 'AI前端')

# 定义BossSpider爬虫类
class BossSpider:
    # 初始化方法
+   def __init__(self):
        # 保存Chrome浏览器路径
+       self.chrome_path = CHROME_PATH
        # 初始化浏览器
+       self._setup_browser()
    # 浏览器初始化方法
+   def _setup_browser(self):
+       try:
            # 设置浏览器路径并保存配置
+           ChromiumOptions().set_browser_path(self.chrome_path).save()
            # 创建Chromium浏览器对象
+           self.browser = Chromium()
            # 获取最新的标签页对象
+           self.tab = self.browser.latest_tab
+       except Exception as e:
            # 浏览器初始化失败时记录错误日志
+           logger.error(f"浏览器初始化失败: {e}")
            # 抛出异常
+           raise
    # 获取职位详情的方法,参数为职位卡片对象
+   def get_job_details(self, card) -> Optional[Dict]:
+       try:
            # 获取职位名称
+           job_name = card.ele('.job-name').text
            # 获取公司名称
+           company = card.ele('@|class=boss-info@|class=boss-name').text
            # 获取城市信息
+           city = card.ele('.company-location').text
            # 获取职位标签列表
+           job_info = card.ele('.tag-list')
            # 获取所有标签元素
+           job_infos = job_info.eles('tag:li')
            # 获取工作经验
+           experience = job_infos[0].text if len(job_infos) > 0 else ''
            # 获取学历要求
+           education = job_infos[1].text if len(job_infos) > 1 else ''
            # 返回职位详情字典
+           return {
+               'job_name': job_name,
+               'company': company,
+               'city': city,
+               'experience': experience,
+               'education': education
+           }
+       except Exception as e:
            # 获取职位详情失败时记录错误日志
+           logger.error(f"获取职位详情失败: {e}")
            # 返回None
+           return None
    # 爬取职位信息的方法
    def crawl_jobs(self):
        # 记录开始爬取的日志
        logger.info("开始爬取职位信息")
        # 打开职位搜索页面
+       self.tab.get(f'https://www.zhipin.com/web/geek/jobs?query={KEYWORD}')
+       try:
            # 等待职位卡片元素加载完成,超时时间10秒
+          self.tab.wait.eles_loaded('.job-card-box', timeout=10)
+       except Exception as e:
            # 等待超时时记录错误日志
+           logger.error(f"等待职位卡片元素超时: {e}")
            # 直接返回
+           return
        # 获取所有职位卡片元素
+       cards = self.tab.eles('.job-card-box')
        # 遍历每个职位卡片
+       for card in cards:
            # 获取职位详情
+           job_data = self.get_job_details(card)
            # 打印职位详情
+           pprint(job_data)

 # 定义运行爬虫的函数
def run_spider():
    try:
        # 创建BossSpider实例
        spider = BossSpider()
        # 调用爬取方法
        spider.crawl_jobs()
    except Exception as e:
        # 捕获异常并记录错误日志
        logger.error(f"爬虫运行失败: {e}")
        # 重新抛出异常
        raise

# 判断是否为主程序入口
if __name__ == '__main__':
    # 运行爬虫
    run_spider()

5.爬取岗位描述 #

5.1. spider.py #

spider.py

# 导入日志模块
import logging
# 导入操作系统相关模块
import os
# 导入pprint用于美观打印数据
from pprint import pprint
# 导入类型提示相关类型
from typing import Optional, Dict
# 从DrissionPage库导入ChromiumOptions和Chromium类
from DrissionPage import ChromiumOptions, Chromium

# 配置日志输出格式和级别
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)

# 获取以当前模块名为名称的logger对象
logger = logging.getLogger(__name__)
# 从环境变量获取Chrome浏览器路径,若未设置则使用默认路径
CHROME_PATH = os.getenv('CHROME_PATH', r'C:\Program Files\Google\Chrome\Application\chrome.exe')
# 从环境变量获取搜索关键词,若未设置则默认为'AI前端'
KEYWORD = os.getenv('SEARCH_KEYWORD', 'AI前端')

# 定义BossSpider爬虫类
class BossSpider:
    # 初始化方法
    def __init__(self):
        # 保存Chrome浏览器路径
        self.chrome_path = CHROME_PATH
        # 初始化浏览器
        self._setup_browser()
    # 浏览器初始化方法
    def _setup_browser(self):
        try:
            # 设置浏览器路径并保存配置
            ChromiumOptions().set_browser_path(self.chrome_path).save()
            # 创建Chromium浏览器对象
            self.browser = Chromium()
            # 获取最新的标签页对象
            self.tab = self.browser.latest_tab
        except Exception as e:
            # 浏览器初始化失败时记录错误日志
            logger.error(f"浏览器初始化失败: {e}")
            # 抛出异常
            raise
    # 获取职位详情的方法,参数为职位卡片对象
    def get_job_details(self, card) -> Optional[Dict]:
        try:
            # 获取职位名称
            job_name = card.ele('.job-name').text
            # 获取公司名称
            company = card.ele('@|class=boss-info@|class=boss-name').text
            # 获取城市信息
            city = card.ele('.company-location').text
            # 获取职位标签列表
            job_info = card.ele('.tag-list')
            # 获取所有标签元素
            job_infos = job_info.eles('tag:li')
            # 获取工作经验
            experience = job_infos[0].text if len(job_infos) > 0 else ''
            # 获取学历要求
            education = job_infos[1].text if len(job_infos) > 1 else ''
            # 等待监听到职位详情接口的响应
+           detail_result = self.tab.listen.wait()
            # 获取接口响应的body内容
+           detail_data = detail_result.response.body
            # 从响应数据中提取岗位描述信息
+           post_description = detail_data.get('zpData', {}).get('jobInfo', {}).get('postDescription', '')
            # 返回职位详情字典
            return {
                'job_name': job_name,
                'company': company,
                'city': city,
                'experience': experience,
+               'education': education,
+               'description': post_description
            }
        except Exception as e:
            # 获取职位详情失败时记录错误日志
            logger.error(f"获取职位详情失败: {e}")
            # 返回None
            return None
    # 爬取职位信息的方法
    def crawl_jobs(self):
        # 监听职位详情接口
+       self.tab.listen.start('wapi/zpgeek/job/detail.json')
        # 记录开始爬取的日志
        logger.info("开始爬取职位信息")
        # 打开职位搜索页面
        self.tab.get(f'https://www.zhipin.com/web/geek/jobs?query={KEYWORD}')
        try:
            # 等待职位卡片元素加载完成,超时时间10秒
            self.tab.wait.eles_loaded('.job-card-box', timeout=10)
        except Exception as e:
            # 等待超时时记录错误日志
            logger.error(f"等待职位卡片元素超时: {e}")
            # 直接返回
            return
        # 获取所有职位卡片元素
        cards = self.tab.eles('.job-card-box')
        # 遍历每个职位卡片
        for card in cards:
            # 获取职位详情
            job_data = self.get_job_details(card)
            # 打印职位详情
            pprint(job_data)

# 定义运行爬虫的函数
def run_spider():
    try:
        # 创建BossSpider实例
        spider = BossSpider()
        # 调用爬取方法
        spider.crawl_jobs()
    except Exception as e:
        # 捕获异常并记录错误日志
        logger.error(f"爬虫运行失败: {e}")
        # 重新抛出异常
        raise

# 判断是否为主程序入口
if __name__ == '__main__':
    # 运行爬虫
    run_spider()

6.保存数据库 #

6.1. 安装 #

pip install sqlalchemy  pymysql python-dotenv   

6.2. .env #

.env

# Chrome配置
CHROME_PATH=C:\Program Files\Google\Chrome\Application\chrome.exe
SEARCH_KEYWORD=AI前端

# 数据库配置
+MYSQL_HOST=localhost
+MYSQL_USER=root
+MYSQL_PASSWORD=123456
+MYSQL_PORT=3306
+MYSQL_DB=jobs

6.3. spider.py #

spider.py

# 导入日志模块
import logging
# 导入操作系统相关模块
import os
# 导入pprint用于美观打印数据
from pprint import pprint
# 导入类型提示相关类型
from typing import Optional, Dict
# 从DrissionPage库导入ChromiumOptions和Chromium类
from DrissionPage import ChromiumOptions, Chromium
# 从SQLAlchemy导入数据库相关模块
+from sqlalchemy import create_engine, Column, Integer, String, Text
# 从SQLAlchemy ORM导入声明基类和会话工厂
+from sqlalchemy.orm import declarative_base,sessionmaker
# 导入上下文管理器
+from contextlib import contextmanager
# 导入环境变量加载模块
+from dotenv import load_dotenv
# 加载环境变量文件
+load_dotenv()

# 配置日志输出格式和级别
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)

# 定义MySQL数据库连接配置字典
+MYSQL_CONF = {
+   'user': os.getenv('MYSQL_USER', 'root'),
+   'password': os.getenv('MYSQL_PASSWORD', '123456'),
+   'host': os.getenv('MYSQL_HOST', 'localhost'),
+   'port': int(os.getenv('MYSQL_PORT', 3306)),
+   'db': os.getenv('MYSQL_DB', 'jobs')
+}
+
# 创建SQLAlchemy声明基类
+Base = declarative_base()
+
# 定义职位信息数据模型类
+class Job(Base):
+   """BOSS直聘职位信息表"""
    # 指定数据库表名
+   __tablename__ = 'jobs'

    # 主键字段
+   id = Column(Integer, primary_key=True, autoincrement=True, comment='主键ID')

    # 职位基本信息字段
+   job_name = Column(String(128), nullable=True, comment='职位名称')
+   company = Column(String(128), nullable=True, comment='公司名称')
+   city = Column(String(64), nullable=True, comment='工作城市')

    # 职位要求字段
+   experience = Column(String(64), nullable=True, comment='工作经验要求')
+   education = Column(String(64), nullable=True, comment='学历要求')

    # 详细描述字段
+   description = Column(Text, nullable=True, comment='职位描述')
+
    # 定义对象的字符串表示方法
+   def __repr__(self):
+       """返回职位的字符串表示"""
+       return f"<Job(position='{self.position}', company='{self.company}', city='{self.city}')>"
+
# 创建数据库引擎对象
+engine = create_engine(
+   f"mysql+pymysql://{MYSQL_CONF['user']}:{MYSQL_CONF['password']}"
+   f"@{MYSQL_CONF['host']}:{MYSQL_CONF['port']}/{MYSQL_CONF['db']}?charset=utf8mb4"
+)
+
# 创建数据库会话工厂
+Session = sessionmaker(bind=engine)
+
# 定义数据库会话上下文管理器装饰器
+@contextmanager
+def get_db_session():
+   """数据库会话上下文管理器"""
    # 创建数据库会话
+   session = Session()
+   try:
        # 返回会话对象
+       yield session
        # 提交事务
+       session.commit()
+   except Exception as e:
        # 发生异常时回滚事务
+       session.rollback()
        # 重新抛出异常
+       raise
+   finally:
        # 确保会话被关闭
+       session.close()
+
# 获取以当前模块名为名称的logger对象
logger = logging.getLogger(__name__)
+
# 从环境变量获取Chrome浏览器路径,若未设置则使用默认路径
CHROME_PATH = os.getenv('CHROME_PATH', r'C:\Program Files\Google\Chrome\Application\chrome.exe')
+
# 从环境变量获取搜索关键词,若未设置则默认为'AI前端'
KEYWORD = os.getenv('SEARCH_KEYWORD', 'AI前端')

# 定义BossSpider爬虫类
class BossSpider:
    # 初始化方法
    def __init__(self):
        # 保存Chrome浏览器路径
        self.chrome_path = CHROME_PATH
        # 初始化浏览器
        self._setup_browser()

    # 浏览器初始化方法
    def _setup_browser(self):
        try:
            # 设置浏览器路径并保存配置
            ChromiumOptions().set_browser_path(self.chrome_path).save()
            # 创建Chromium浏览器对象
            self.browser = Chromium()
            # 获取最新的标签页对象
            self.tab = self.browser.latest_tab
        except Exception as e:
            # 浏览器初始化失败时记录错误日志
            logger.error(f"浏览器初始化失败: {e}")
            # 抛出异常
            raise

    # 获取职位详情的方法,参数为职位卡片对象
    def get_job_details(self, card) -> Optional[Dict]:
        try:
            # 获取职位名称
            job_name = card.ele('.job-name').text
            # 获取公司名称
            company = card.ele('@|class=boss-info@|class=boss-name').text
            # 获取城市信息
            city = card.ele('.company-location').text
            # 打印进度
+           logger.info(f"{company} {job_name}")
            # 获取职位标签列表
            job_info = card.ele('.tag-list')
            # 获取所有标签元素
            job_infos = job_info.eles('tag:li')
            # 获取工作经验
            experience = job_infos[0].text if len(job_infos) > 0 else ''
            # 获取学历要求
            education = job_infos[1].text if len(job_infos) > 1 else ''
            # 点击职位卡片
            card.click()
            # 等待1秒
            self.browser.wait(1)
            # 等待监听到职位详情接口的响应
            detail_result = self.tab.listen.wait()
            # 获取接口响应的body内容
            detail_data = detail_result.response.body
            # 从响应数据中提取岗位描述信息
            post_description = detail_data.get('zpData', {}).get('jobInfo', {}).get('postDescription', '')
            # 返回职位详情字典
            return {
                'job_name': job_name,
                'company': company,
                'city': city,
                'experience': experience,
                'education': education,
                'description': post_description
            }
        except Exception as e:
            # 获取职位详情失败时记录错误日志
            logger.error(f"获取职位详情失败: {e}")
            # 返回None
            return None

    # 爬取职位信息的方法
    def crawl_jobs(self):
        # 创建数据库表结构
+       Base.metadata.create_all(engine)
        # 监听职位详情接口
        self.tab.listen.start('wapi/zpgeek/job/detail.json')
        # 记录开始爬取的日志
        logger.info("开始爬取职位信息")
        # 打开职位搜索页面
        self.tab.get(f'https://www.zhipin.com/web/geek/jobs?query={KEYWORD}')
        try:
            # 等待职位卡片元素加载完成,超时时间10秒
            self.tab.wait.eles_loaded('.job-card-box', timeout=10)
        except Exception as e:
            # 等待超时时记录错误日志
            logger.error(f"等待职位卡片元素超时: {e}")
            # 直接返回
            return
+       
        # 使用数据库会话上下文管理器
+       with get_db_session() as session:
            # 获取所有职位卡片元素
+           cards = self.tab.eles('.job-card-box')
            # 遍历每个职位卡片
+           for card in cards:
+               # 点击职位卡片
+               card.click()
+               # 等待1秒
+               self.browser.wait(1)
+               # 获取职位详情
+               job_data = self.get_job_details(card)
+               # 如果获取到职位数据,则添加到数据库会话中
+               if job_data:
+                   session.add(Job(**job_data))
+               # 等待1秒
+               self.tab.wait(1)    

# 定义运行爬虫的函数
def run_spider():
    try:
        # 创建BossSpider实例
        spider = BossSpider()
        # 调用爬取方法
        spider.crawl_jobs()
    except Exception as e:
        # 捕获异常并记录错误日志
        logger.error(f"爬虫运行失败: {e}")
        # 重新抛出异常
        raise

# 判断是否为主程序入口
if __name__ == '__main__':
    # 运行爬虫
    run_spider()

7.分页处理 #

7.1. spider.py #

spider.py

# 导入日志模块,用于记录程序运行过程中的各种信息
import logging
# 导入操作系统相关模块,用于获取环境变量和系统路径
import os
# 导入pprint用于美观打印数据,便于调试时查看复杂数据结构
from pprint import pprint
# 导入类型提示相关类型,用于函数参数和返回值的类型注解
from typing import Optional, Dict
# 从DrissionPage库导入ChromiumOptions和Chromium类,用于控制Chrome浏览器
from DrissionPage import ChromiumOptions, Chromium
# 从SQLAlchemy导入数据库相关模块,用于数据库连接和表结构定义
from sqlalchemy import create_engine, Column, Integer, String, Text
# 从SQLAlchemy ORM导入声明基类和会话工厂,用于ORM操作
from sqlalchemy.orm import declarative_base,sessionmaker
# 导入上下文管理器,用于自动管理资源的获取和释放
from contextlib import contextmanager
# 导入环境变量加载模块,用于从.env文件加载配置
from dotenv import load_dotenv
# 加载环境变量文件,读取.env文件中的配置信息
load_dotenv()

# 配置日志输出格式和级别,设置日志的时间格式、级别和输出格式
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)

# 定义MySQL数据库连接配置字典,从环境变量获取数据库连接参数
MYSQL_CONF = {
    'user': os.getenv('MYSQL_USER', 'root'),
    'password': os.getenv('MYSQL_PASSWORD', '123456'),
    'host': os.getenv('MYSQL_HOST', 'localhost'),
    'port': int(os.getenv('MYSQL_PORT', 3306)),
    'db': os.getenv('MYSQL_DB', 'jobs')
}

# 创建SQLAlchemy声明基类,所有数据模型类都需要继承这个基类
Base = declarative_base()

# 定义职位信息数据模型类,对应数据库中的jobs表
class Job(Base):
    """BOSS直聘职位信息表"""
    # 指定数据库表名,定义该模型对应的数据库表名称
    __tablename__ = 'jobs'

    # 主键字段,自增的整数类型主键
    id = Column(Integer, primary_key=True, autoincrement=True, comment='主键ID')

    # 职位基本信息字段,存储职位名称、公司名称和工作城市
    job_name = Column(String(128), nullable=True, comment='职位名称')
    company = Column(String(128), nullable=True, comment='公司名称')
    city = Column(String(64), nullable=True, comment='工作城市')

    # 职位要求字段,存储工作经验要求和学历要求
    experience = Column(String(64), nullable=True, comment='工作经验要求')
    education = Column(String(64), nullable=True, comment='学历要求')

    # 详细描述字段,存储完整的职位描述信息
    description = Column(Text, nullable=True, comment='职位描述')

    # 定义对象的字符串表示方法,用于调试时显示对象信息
    def __repr__(self):
        """返回职位的字符串表示"""
        return f"<Job(position='{self.position}', company='{self.company}', city='{self.city}')>"

# 创建数据库引擎对象,建立与MySQL数据库的连接
engine = create_engine(
    f"mysql+pymysql://{MYSQL_CONF['user']}:{MYSQL_CONF['password']}"
    f"@{MYSQL_CONF['host']}:{MYSQL_CONF['port']}/{MYSQL_CONF['db']}?charset=utf8mb4"
)

# 创建数据库会话工厂,用于生成数据库会话对象
Session = sessionmaker(bind=engine)

# 定义数据库会话上下文管理器装饰器,自动管理数据库会话的生命周期
@contextmanager
def get_db_session():
    """数据库会话上下文管理器"""
    # 创建数据库会话对象
    session = Session()
    try:
        # 返回会话对象给调用者使用
        yield session
        # 如果没有异常发生,提交事务保存数据
        session.commit()
    except Exception as e:
        # 发生异常时回滚事务,撤销未提交的更改
        session.rollback()
        # 重新抛出异常,让上层代码处理
        raise
    finally:
        # 确保会话被关闭,释放数据库连接资源
        session.close()

# 获取以当前模块名为名称的logger对象,用于记录当前模块的日志信息
logger = logging.getLogger(__name__)

# 从环境变量获取Chrome浏览器路径,若未设置则使用默认路径
CHROME_PATH = os.getenv('CHROME_PATH', r'C:\Program Files\Google\Chrome\Application\chrome.exe')

# 从环境变量获取搜索关键词,若未设置则默认为'AI前端'
KEYWORD = os.getenv('SEARCH_KEYWORD', 'AI前端')

# 定义BossSpider爬虫类,负责爬取BOSS直聘网站的职位信息
class BossSpider:
    # 初始化方法,设置爬虫的基本配置
    def __init__(self):
        # 保存Chrome浏览器路径到实例变量
        self.chrome_path = CHROME_PATH
        # 调用浏览器初始化方法
        self._setup_browser()

    # 浏览器初始化方法,配置并启动Chrome浏览器
    def _setup_browser(self):
        try:
            # 设置浏览器路径并保存配置到DrissionPage
            ChromiumOptions().set_browser_path(self.chrome_path).save()
            # 创建Chromium浏览器对象
            self.browser = Chromium()
            # 获取最新的标签页对象,用于后续的页面操作
            self.tab = self.browser.latest_tab
        except Exception as e:
            # 浏览器初始化失败时记录错误日志
            logger.error(f"浏览器初始化失败: {e}")
            # 抛出异常,终止程序执行
            raise

    # 获取职位详情的方法,从职位卡片中提取详细信息
    def get_job_details(self, card) -> Optional[Dict]:
        try:
            # 获取职位名称,通过CSS选择器定位元素并提取文本
            job_name = card.ele('.job-name').text
            # 获取公司名称,使用复合选择器定位boss信息区域
            company = card.ele('@|class=boss-info@|class=boss-name').text
            # 获取城市信息,从公司位置元素中提取
            city = card.ele('.company-location').text
            # 获取职位标签列表容器元素
            job_info = card.ele('.tag-list')
            # 获取所有标签元素,这些标签包含经验和学历要求
            job_infos = job_info.eles('tag:li')
            # 获取工作经验要求,如果存在标签则取第一个
            experience = job_infos[0].text if len(job_infos) > 0 else ''
            # 获取学历要求,如果存在第二个标签则取第二个
            education = job_infos[1].text if len(job_infos) > 1 else ''
            # 等待监听到职位详情接口的响应,获取API数据
            detail_result = self.tab.listen.wait()
            # 获取接口响应的body内容,包含职位详细信息
            detail_data = detail_result.response.body
            # 从响应数据中提取岗位描述信息,使用嵌套字典访问
            post_description = detail_data.get('zpData', {}).get('jobInfo', {}).get('postDescription', '')
            # 返回职位详情字典,包含所有提取的信息
            return {
                'job_name': job_name,
                'company': company,
                'city': city,
                'experience': experience,
                'education': education,
                'description': post_description
            }
        except Exception as e:
            # 获取职位详情失败时记录错误日志
            logger.error(f"获取职位详情失败: {e}")
            # 返回None表示获取失败
            return None

    # 爬取职位信息的主要方法,负责整个爬取流程
    def crawl_jobs(self):
        # 创建数据库表结构,如果表不存在则创建
        Base.metadata.create_all(engine)
        # 监听职位详情接口,准备捕获API响应
        self.tab.listen.start('wapi/zpgeek/job/detail.json')
        # 记录开始爬取的日志信息
        logger.info("开始爬取职位信息")
        # 打开职位搜索页面,使用关键词进行搜索
        self.tab.get(f'https://www.zhipin.com/web/geek/jobs?query={KEYWORD}')
        # 打印标签页对象信息,用于调试
        pprint(self.tab)
        try:
            # 等待职位卡片元素加载完成,超时时间10秒
            self.tab.wait.eles_loaded('.job-card-box', timeout=10)
        except Exception as e:
            # 等待超时时记录错误日志
            logger.error(f"等待职位卡片元素超时: {e}")
            # 直接返回
            return
+       processed_count = 0  # 已处理的元素数量
        # 使用数据库会话上下文管理器
        with get_db_session() as session:
+            while True:
+               # 获取所有职位卡片元素
+               cards = self.tab.eles('.job-card-box')
+               current_count = len(cards)
+               if current_count <= processed_count:
+                   logger.info(f"已加载完所有职位,共 {current_count} 个")
+                   break
+               # 遍历每个职位卡片
+               for card in cards[processed_count:]:
                    # 点击职位卡片,触发详情页面加载
                    card.click()
                    # 等待1秒,让页面有时间响应点击事件
                    self.browser.wait(1)
                    # 获取职位详情
                    job_data = self.get_job_details(card)
                    # 如果获取到职位数据,则添加到数据库会话中
                    if job_data:
                        session.add(Job(**job_data))
                    # 等待1秒
                    self.tab.wait(1)    
+               processed_count = current_count    
+               print(f"已处理 {processed_count} 个职位")
+               self.tab.scroll.to_bottom()
+               self.tab.wait(2)

# 定义运行爬虫的函数
def run_spider():
    try:
        # 创建BossSpider实例
        spider = BossSpider()
        # 调用爬取方法
        spider.crawl_jobs()
    except Exception as e:
        # 捕获异常并记录错误日志
        logger.error(f"爬虫运行失败: {e}")
        # 重新抛出异常
        raise

# 判断是否为主程序入口
if __name__ == '__main__':
    # 运行爬虫
    run_spider()

8.定时任务 #

pip install schedule

8.1. scheduler.py #

scheduler.py

# 导入定时任务调度库,用于设置定时执行任务
import schedule
# 导入时间模块并重命名为t,用于控制程序休眠时间
import time as t
# 导入操作系统模块,用于获取环境变量
import os
# 从dotenv库导入环境变量加载函数
from dotenv import load_dotenv
# 从spider模块导入爬虫运行函数
from spider import run_spider

# 加载.env文件中的环境变量到系统环境
load_dotenv()

# 判断是否为主程序入口
if __name__ == '__main__':
    # 从环境变量获取定时执行时间,默认为每天05:00
    schedule_time = os.getenv('SCHEDULE_TIME', '05:00')
    # 设置每天在指定时间执行爬虫任务
    schedule.every().day.at(schedule_time).do(run_spider)
    # 打印启动信息,显示定时任务的执行时间
    print(f"计划任务已启动,每天 {schedule_time} 执行爬虫。")
    # 无限循环,持续检查是否有待执行的任务
    while True:
        # 检查并执行到期的定时任务
        schedule.run_pending()
        # 程序休眠60秒,避免过度占用CPU资源
        t.sleep(60) 

9.Web服务 #

pip install flask

9.1. server.py #

server.py

# 从Flask框架导入Flask应用类和jsonify函数,用于创建Web应用和返回JSON响应
from flask import Flask, jsonify
# 从database模块导入数据库会话管理、Job模型和数据库初始化函数
from database import get_db_session, Job
# 导入json模块,用于处理JSON数据
import json
# 导入Response类,用于创建HTTP响应
from flask import Response
# 导入日志模块,用于记录程序运行过程中的各种信息
import logging

# 配置日志输出格式和级别,设置日志的时间格式、级别和输出格式
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
# 获取以当前模块名为名称的logger对象,用于记录当前模块的日志信息
logger = logging.getLogger(__name__)

# 创建Flask应用实例,用于处理HTTP请求和响应
app = Flask(__name__)
# 定义GET请求路由,用于获取所有职位信息
@app.route('/api/jobs', methods=['GET'])
def get_jobs():
    try:
        # 使用数据库会话上下文管理器,自动管理数据库连接的获取和释放
        with get_db_session() as session:
            # 查询数据库中的所有职位记录
            jobs = session.query(Job).all()
            # 创建空列表用于存储格式化后的职位数据
            jobs_list = []
            # 遍历每个职位对象,将其转换为字典格式
            for job in jobs:
                jobs_list.append({
                    'id': job.id,
                    'job_name': job.job_name,
                    'company': job.company,
                    'city': job.city,
                    'experience': job.experience,
                    'education': job.education,
                    'description': job.description
                })
            # 记录成功获取职位信息的日志
            logger.info(f"成功获取 {len(jobs_list)} 个职位信息")
            # 返回JSON格式的成功响应,包含职位数据和总数
#           return jsonify({
#                'success': True,
#                'data': jobs_list,
#                'total': len(jobs_list)
#            })
            return Response(
              json.dumps(jobs_list, ensure_ascii=False),
              mimetype='application/json'
            )
    except Exception as e:
        # 捕获异常并记录错误日志
        logger.error(f"获取职位列表失败: {e}")
        # 返回JSON格式的错误响应,状态码为500
        return jsonify({
            'success': False,
            'error': str(e)
        }), 500
# 判断是否为主程序入口
if __name__ == '__main__':
    # 记录启动服务的日志信息
    logger.info("启动Flask服务...")
    # 启动Flask应用,监听所有网络接口的5000端口,开启调试模式
    app.run(host='0.0.0.0', port=5000, debug=True)

9.2. database.py #

database.py

# 从SQLAlchemy导入数据库引擎创建、列定义、整数类型、字符串类型和文本类型
from sqlalchemy import create_engine, Column, Integer, String, Text
# 从SQLAlchemy ORM导入声明基类和会话工厂
from sqlalchemy.orm import declarative_base, sessionmaker
# 导入上下文管理器,用于自动管理资源的获取和释放
from contextlib import contextmanager
# 导入操作系统模块,用于获取环境变量
import os
# 从dotenv库导入环境变量加载函数
from dotenv import load_dotenv

# 加载.env文件中的环境变量到系统环境
load_dotenv()

# 定义MySQL数据库连接配置字典,从环境变量获取配置信息
MYSQL_CONF = {
    # 数据库用户名,默认为root
    'user': os.getenv('MYSQL_USER', 'root'),
    # 数据库密码,默认为123456
    'password': os.getenv('MYSQL_PASSWORD', '123456'),
    # 数据库主机地址,默认为localhost
    'host': os.getenv('MYSQL_HOST', 'localhost'),
    # 数据库端口号,默认为3306
    'port': int(os.getenv('MYSQL_PORT', 3306)),
    # 数据库名称,默认为jobs
    'db': os.getenv('MYSQL_DB', 'jobs')
}

# 创建SQLAlchemy声明基类,用于定义数据库表模型
Base = declarative_base()

# 定义Job职位信息表模型类,继承自Base
class Job(Base):
    # 指定数据库表名为jobs
    __tablename__ = 'jobs'
    # 定义主键ID字段,整数类型,自动递增
    id = Column(Integer, primary_key=True, autoincrement=True, comment='主键ID')
    # 定义职位名称字段,字符串类型,最大长度128,可为空
    job_name = Column(String(128), nullable=True, comment='职位名称')
    # 定义公司名称字段,字符串类型,最大长度128,可为空
    company = Column(String(128), nullable=True, comment='公司名称')
    # 定义工作城市字段,字符串类型,最大长度64,可为空
    city = Column(String(64), nullable=True, comment='工作城市')
    # 定义工作经验要求字段,字符串类型,最大长度64,可为空
    experience = Column(String(64), nullable=True, comment='工作经验要求')
    # 定义学历要求字段,字符串类型,最大长度64,可为空
    education = Column(String(64), nullable=True, comment='学历要求')
    # 定义职位描述字段,文本类型,可为空
    description = Column(Text, nullable=True, comment='职位描述')
    # 定义对象的字符串表示方法,用于调试时显示对象信息
    def __repr__(self):
        return f"<Job(job_name='{self.job_name}', company='{self.company}', city='{self.city}')>"

# 创建数据库引擎,使用MySQL连接配置构建连接字符串
engine = create_engine(
    # 构建MySQL连接URL,包含用户名、密码、主机、端口、数据库名和字符集
    f"mysql+pymysql://{MYSQL_CONF['user']}:{MYSQL_CONF['password']}"
    f"@{MYSQL_CONF['host']}:{MYSQL_CONF['port']}/{MYSQL_CONF['db']}?charset=utf8mb4"
)
# 根据模型定义创建数据库表
Base.metadata.create_all(engine) 
# 创建会话工厂,绑定到数据库引擎
Session = sessionmaker(bind=engine)

# 定义数据库会话上下文管理器,自动管理会话的创建、提交、回滚和关闭
@contextmanager
def get_db_session():
    # 创建新的数据库会话
    session = Session()
    try:
        # 返回会话对象给调用者使用
        yield session
        # 提交事务,保存所有更改
        session.commit()
    except Exception as e:
        # 发生异常时回滚事务,撤销所有更改
        session.rollback()
        # 重新抛出异常
        raise
    finally:
        # 无论是否发生异常,都要关闭会话
        session.close()


9.3. spider.py #

spider.py

# 导入日志模块,用于记录程序运行过程中的各种信息
import logging
# 导入操作系统相关模块,用于获取环境变量和系统路径
import os
# 导入pprint用于美观打印数据,便于调试时查看复杂数据结构
from pprint import pprint
# 导入类型提示相关类型,用于函数参数和返回值的类型注解
from typing import Optional, Dict
# 从DrissionPage库导入ChromiumOptions和Chromium类,用于控制Chrome浏览器
from DrissionPage import ChromiumOptions, Chromium
# 导入环境变量加载模块,用于从.env文件加载配置
from dotenv import load_dotenv
# 加载环境变量文件,读取.env文件中的配置信息
load_dotenv()
# 从database.py导入get_db_session、Job
+from database import get_db_session, Job

# 配置日志输出格式和级别,设置日志的时间格式、级别和输出格式
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)

# 获取以当前模块名为名称的logger对象,用于记录当前模块的日志信息
logger = logging.getLogger(__name__)

# 从环境变量获取Chrome浏览器路径,若未设置则使用默认路径
CHROME_PATH = os.getenv('CHROME_PATH', r'C:\Program Files\Google\Chrome\Application\chrome.exe')

# 从环境变量获取搜索关键词,若未设置则默认为'AI前端'
KEYWORD = os.getenv('SEARCH_KEYWORD', 'AI前端')

# 定义BossSpider爬虫类,负责爬取BOSS直聘网站的职位信息
class BossSpider:
    # 初始化方法,设置爬虫的基本配置
    def __init__(self):
        # 保存Chrome浏览器路径到实例变量
        self.chrome_path = CHROME_PATH
        # 调用浏览器初始化方法
        self._setup_browser()

    # 浏览器初始化方法,配置并启动Chrome浏览器
    def _setup_browser(self):
        try:
            # 设置浏览器路径并保存配置到DrissionPage
            ChromiumOptions().set_browser_path(self.chrome_path).save()
            # 创建Chromium浏览器对象
            self.browser = Chromium()
            # 获取最新的标签页对象,用于后续的页面操作
            self.tab = self.browser.latest_tab
        except Exception as e:
            # 浏览器初始化失败时记录错误日志
            logger.error(f"浏览器初始化失败: {e}")
            # 抛出异常,终止程序执行
            raise

    # 获取职位详情的方法,从职位卡片中提取详细信息
    def get_job_details(self, card) -> Optional[Dict]:
        try:
            # 获取职位名称,通过CSS选择器定位元素并提取文本
            job_name = card.ele('.job-name').text
            # 获取公司名称,使用复合选择器定位boss信息区域
            company = card.ele('@|class=boss-info@|class=boss-name').text
            # 获取城市信息,从公司位置元素中提取
            city = card.ele('.company-location').text
            # 获取职位标签列表容器元素
            job_info = card.ele('.tag-list')
            # 获取所有标签元素,这些标签包含经验和学历要求
            job_infos = job_info.eles('tag:li')
            # 获取工作经验要求,如果存在标签则取第一个
            experience = job_infos[0].text if len(job_infos) > 0 else ''
            # 获取学历要求,如果存在第二个标签则取第二个
            education = job_infos[1].text if len(job_infos) > 1 else ''
            # 等待监听到职位详情接口的响应,获取API数据
            detail_result = self.tab.listen.wait()
            # 点击职位卡片,触发详情页面加载
            card.click()
            # 等待1秒,让页面有时间响应点击事件
            self.browser.wait(1)
            # 获取接口响应的body内容,包含职位详细信息
            detail_data = detail_result.response.body
            # 从响应数据中提取岗位描述信息,使用嵌套字典访问
            post_description = detail_data.get('zpData', {}).get('jobInfo', {}).get('postDescription', '')
            # 返回职位详情字典,包含所有提取的信息
            return {
                'job_name': job_name,
                'company': company,
                'city': city,
                'experience': experience,
                'education': education,
                'description': post_description
            }
        except Exception as e:
            # 获取职位详情失败时记录错误日志
            logger.error(f"获取职位详情失败: {e}")
            # 返回None表示获取失败
            return None

    # 爬取职位信息的主要方法,负责整个爬取流程
    def crawl_jobs(self):
        # 记录开始爬取的日志信息
        logger.info("开始爬取职位信息")
        # 打开职位搜索页面,使用关键词进行搜索
        self.tab.get(f'https://www.zhipin.com/web/geek/jobs?query={KEYWORD}')
        # 打印标签页对象信息,用于调试
        pprint(self.tab)
        try:
            # 等待职位卡片元素加载完成,超时时间10秒
            self.tab.wait.eles_loaded('.job-card-box', timeout=10)
        except Exception as e:
            # 等待超时时记录错误日志
            logger.error(f"等待职位卡片元素超时: {e}")
            # 直接返回
            return
        processed_count = 0  # 已处理的元素数量
        # 使用数据库会话上下文管理器
        with get_db_session() as session:
             while True:
                # 获取所有职位卡片元素
                cards = self.tab.eles('.job-card-box')
                current_count = len(cards)
                if current_count <= processed_count:
                    logger.info(f"已加载完所有职位,共 {current_count} 个")
                    break
                # 遍历每个职位卡片
                for card in cards[processed_count:]:
                    # 获取职位详情
                    job_data = self.get_job_details(card)
                    # 如果获取到职位数据,则添加到数据库会话中
                    if job_data:
                        session.add(Job(**job_data))
                    # 等待1秒
                    self.tab.wait(1)    
                processed_count = current_count    
                print(f"已处理 {processed_count} 个职位")
                self.tab.scroll.to_bottom()
                self.tab.wait(2)

# 定义运行爬虫的函数
def run_spider():
    try:
        # 创建BossSpider实例
        spider = BossSpider()
        # 调用爬取方法
        spider.crawl_jobs()
    except Exception as e:
        # 捕获异常并记录错误日志
        logger.error(f"爬虫运行失败: {e}")
        # 重新抛出异常
        raise

# 判断是否为主程序入口
if __name__ == '__main__':
    # 运行爬虫
    run_spider()

10.上传简历 #

pip install pdfplumber  python-docx

10.1. job.html #

templates/job.html

<!DOCTYPE html>
<html lang="zh-CN">

<head>
  <meta charset="UTF-8">
  <title>简历上传</title>
</head>

<body>
  <h2>上传简历(仅支持PDF和Word)</h2>
  <form id="uploadForm" enctype="multipart/form-data">
    <input type="file" name="resume" accept=".pdf,.docx" required />
    <button type="submit">上传</button>
  </form>
  <h3>简历内容:</h3>
  <pre id="resumeText"></pre>
  <script>
    document.getElementById('uploadForm').onsubmit = async function (e) {
      e.preventDefault();
      const formData = new FormData(this);
      const res = await fetch('/job/upload', {
        method: 'POST',
        body: formData
      });
      const data = await res.json();
      if (data.success) {
        document.getElementById('resumeText').textContent = data.text;
      } else {
        document.getElementById('resumeText').textContent = '解析失败:' + data.error;
      }
    }
  </script>
</body>

</html>

10.2. server.py #

server.py

# 导入uuid模块,用于生成唯一文件名
+import uuid
# 从Flask框架导入Flask、jsonify、request、render_template
+from flask import Flask, jsonify, request, render_template
# 从database模块导入数据库会话管理器和Job模型
from database import get_db_session, Job
# 导入json模块,用于处理JSON数据
import json
# 从Flask框架导入Response类,用于自定义HTTP响应
from flask import Response
# 导入logging模块,用于日志记录
import logging
# 导入os模块,用于文件和路径操作
+import os
# 导入pdfplumber库,用于解析pdf文档
+import pdfplumber
# 导入Document类,用于解析docx文档
+from docx import Document
# 配置日志输出格式和日志级别
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
# 获取当前模块的logger对象
logger = logging.getLogger(__name__)

# 创建Flask应用实例
app = Flask(__name__)
# 设置上传文件保存目录
+app.config['UPLOAD_FOLDER'] = 'uploads'
# 设置最大上传文件大小为10MB
+app.config['MAX_CONTENT_LENGTH'] = 10 * 1024 * 1024
# 允许上传的文件扩展名集合
+ALLOWED_EXTENSIONS = {'pdf', 'docx'}
+
# 如果上传目录不存在则创建
+if not os.path.exists(app.config['UPLOAD_FOLDER']):
+   os.makedirs(app.config['UPLOAD_FOLDER'])
+
# 判断文件扩展名是否允许上传
def allowed_file(ext):
    return ext in ALLOWED_EXTENSIONS

# 定义GET请求路由,返回职位上传页面
+@app.route('/job', methods=['GET'])
+def job_upload_page():
+   return render_template('job.html')
+
# 定义POST请求路由,处理简历文件上传
+@app.route('/job/upload', methods=['POST'])
+def upload_resume():
    # 检查请求中是否包含文件
+   if 'resume' not in request.files:
+       return jsonify({'success': False, 'error': '未选择文件'})
    # 获取上传的文件对象
+   file = request.files['resume']
    # 检查文件名是否为空
+   if file.filename == '':
+       return jsonify({'success': False, 'error': '未选择文件'})
+   # 获取文件扩展名并转为小写
+   ext = os.path.splitext(file.filename)[1].lower().lstrip(".")
    # 检查文件类型是否合法
+   if not allowed_file(ext):
+       return jsonify({'success': False, 'error': '仅支持pdf和docx文件'})
    # 生成唯一文件名
+   filename = f"{uuid.uuid4().hex}.{ext}"
    # 拼接保存路径
+   save_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
    # 保存文件到指定路径
+   file.save(save_path)
+   try:
        # 初始化文本内容为空
+       text = ''
        # 如果是pdf文件,使用pdfplumber解析
+       if ext == 'pdf':
+           with pdfplumber.open(save_path) as pdf:
+               text = "\n".join(page.extract_text() for page in pdf.pages)
        # 如果是docx或doc文件,使用python-docx解析
+       elif ext == 'docx':
+           doc = Document(save_path)
+           text_list = []
+           for para in doc.paragraphs:
+               text_list.append(para.text)
+           text = "\n".join(text_list)
        # 其他文件类型不支持
+       else:
+           return jsonify({'success': False, 'error': '文件类型不支持'})
        # 返回解析后的文本内容
+       return jsonify({'success': True, 'text': text.strip()})
+   except Exception as e:
        # 解析失败时记录错误日志
+       logger.error(f'简历解析失败: {e}')
        # 返回错误信息
+       return jsonify({'success': False, 'error': str(e)})
+
# 定义GET请求路由,获取所有职位信息
@app.route('/api/jobs', methods=['GET'])
def get_jobs():
    try:
        # 使用数据库会话上下文管理器,自动管理数据库连接
        with get_db_session() as session:
            # 查询所有职位记录
            jobs = session.query(Job).all()
            # 创建空列表用于存储职位数据
            jobs_list = []
            # 遍历每个职位对象,转换为字典格式
            for job in jobs:
                jobs_list.append({
                    'id': job.id,
                    'job_name': job.job_name,
                    'company': job.company,
                    'city': job.city,
                    'experience': job.experience,
                    'education': job.education,
                    'description': job.description
                })
            # 记录获取职位信息的日志
            logger.info(f"成功获取 {len(jobs_list)} 个职位信息")
            # 返回JSON格式的职位数据
            return Response(
+               json.dumps(jobs_list, ensure_ascii=False),
+               mimetype='application/json'
            )
    except Exception as e:
        # 捕获异常并记录错误日志
        logger.error(f"获取职位列表失败: {e}")
        # 返回错误响应,状态码500
        return jsonify({
            'success': False,
            'error': str(e)
        }), 500
+
# 判断是否为主程序入口
if __name__ == '__main__':
    # 记录服务启动日志
    logger.info("启动Flask服务...")
    # 启动Flask应用,监听所有IP的5000端口,开启调试模式
    app.run(host='0.0.0.0', port=5000, debug=True)

11.匹配分析 #

pip install openai

11.1. .env #

.env

# Chrome配置
CHROME_PATH=C:\Program Files\Google\Chrome\Application\chrome.exe
SEARCH_KEYWORD=AI前端

# 数据库配置
MYSQL_HOST=localhost
MYSQL_USER=root
MYSQL_PASSWORD=123456
MYSQL_PORT=3306
MYSQL_DB=jobs
+
# API_KEY
+DEEPSEEK_API_KEY=sk-8146a78d6fdc4028849fbd487c551e01

11.2. job.html #

templates/job.html

<!DOCTYPE html>
<html lang="zh-CN">

<head>
  <meta charset="UTF-8">
  <title>简历上传</title>
</head>

<body>
  <h2>上传简历(仅支持PDF和Word)</h2>
  <form id="uploadForm" enctype="multipart/form-data">
    <input type="file" name="resume" accept=".pdf,.docx" required />
    <button type="submit">上传</button>
  </form>
  <h3>简历内容:</h3>
+ <div id="analysisText"></div>
  <script>
    document.getElementById('uploadForm').onsubmit = async function (e) {
      e.preventDefault();
      const formData = new FormData(this);
      const res = await fetch('/job/upload', {
        method: 'POST',
        body: formData
      });
      const data = await res.json();
      if (data.success) {
+       const results = data.results;
+       console.log(results);
+       const result = results.map(item => `
+         <div>
+           <h3>${item.job_name}</h3>
+           <p>${item.company}</p>
+           <p>${item.analysis}</p>
+         </div>
+       `).join('');
+       document.getElementById('analysisText').innerHTML = result;
      } else {
+       document.getElementById('analysisText').textContent = '分析失败:' + data.error;
      }
    }
  </script>
</body>

</html>

11.3. server.py #

server.py

# 导入uuid模块,用于生成唯一文件名
import uuid
# 从Flask框架导入Flask、jsonify、request、render_template
from flask import Flask, jsonify, request, render_template
# 从database模块导入数据库会话管理器和Job模型
from database import get_db_session, Job
# 导入json模块,用于处理JSON数据
import json
# 从Flask框架导入Response类,用于自定义HTTP响应
from flask import Response
# 导入logging模块,用于日志记录
import logging
# 导入os模块,用于文件和路径操作
import os
# 导入Document类,用于解析docx文档
from docx import Document
# 导入pdfplumber库,用于解析pdf文档
import pdfplumber
# 导入openai库,用于调用OpenAI大模型
+from openai import OpenAI
+
# 配置日志输出格式和日志级别
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
# 获取当前模块的logger对象
logger = logging.getLogger(__name__)

# 创建Flask应用实例
app = Flask(__name__)
# 设置上传文件保存目录
app.config['UPLOAD_FOLDER'] = 'uploads'
# 设置最大上传文件大小为10MB
app.config['MAX_CONTENT_LENGTH'] = 10 * 1024 * 1024
# 允许上传的文件扩展名集合
ALLOWED_EXTENSIONS = {'pdf', 'docx'}

# 如果上传目录不存在则创建
if not os.path.exists(app.config['UPLOAD_FOLDER']):
    os.makedirs(app.config['UPLOAD_FOLDER'])

# 判断文件扩展名是否允许上传
+def allowed_file(ext):
+   return ext in ALLOWED_EXTENSIONS

# 定义GET请求路由,返回职位上传页面
@app.route('/job', methods=['GET'])
def job_upload_page():
    return render_template('job.html')

# 创建OpenAI客户端实例,配置API密钥和基础URL
+client = OpenAI(
+   api_key=os.getenv('DEEPSEEK_API_KEY', ''),
+   base_url="https://api.deepseek.com/v1" 
+)
+
# 定义大模型分析提示词模板,用于简历与职位匹配度分析
+PROMPT_TEMPLATE = '''你是一个AI前端招聘的技术专家
+请根据下面职位描述和简历的内容,给出是否匹配的明确结论,匹配或者不匹配
+职位信息如下:{job_desc}
+简历内容如下:{resume_text}
+请按以下格式输出分析结果:
+1. 是否符合职位要求(是/否),并简要说明原因
+2. 技术能力分析:
+ - 已具备的技术点
+ - 欠缺的技术点
+3. 改进建议:
+ - 具体的学习路径
+ - 推荐的学习资源
+ - 预计达到要求的时间周期
+注意:请只关注技术能力方面的匹配度,忽略城市、学历等其他因素'''
+
# 定义文件内容提取函数,接收文件路径和扩展名参数
+def extract_resume_text(save_path, ext):
    # 初始化文本变量为空字符串
+   text = ''
    # 判断文件扩展名是否为PDF格式
+   if ext == 'pdf':
        # 使用pdfplumber库打开PDF文件
+       with pdfplumber.open(save_path) as pdf:
            # 遍历PDF的每一页,提取文本内容并用换行符连接
+           text = "\n".join(page.extract_text() or '' for page in pdf.pages)
    # 判断文件扩展名是否为DOCX格式
+   elif ext == 'docx':
        # 使用python-docx库打开Word文档
+       doc = Document(save_path)
        # 提取文档中所有段落的文本内容到列表中
+       text_list = [para.text for para in doc.paragraphs]
        # 将段落文本用换行符连接成完整文本
+       text = "\n".join(text_list)
    # 如果文件格式既不是PDF也不是DOCX
+   else:
        # 抛出值错误异常,提示文件类型不支持
+       raise ValueError('文件类型不支持')
    # 返回去除首尾空白字符的文本内容
+   return text.strip()
+
# 定义大模型分析函数,接收职位描述和简历文本作为参数
+def analyze_with_llm(job_desc, resume_text):
    # 使用提示词模板格式化职位描述和简历文本
+   prompt = PROMPT_TEMPLATE.format(job_desc=job_desc, resume_text=resume_text)
    # 尝试调用大模型API进行分析
+   try:
        # 创建聊天完成请求,使用deepseek-chat模型
+       response = client.chat.completions.create(
            # 指定使用的模型名称
+           model="deepseek-chat",
            # 设置消息格式,包含用户角色和提示内容
+           messages=[
+               {"role": "system", "content": "你是一个"+os.getenv('SEARCH_KEYWORD','AI前端')+"招聘的技术专家"},
+               {"role": "user", "content": prompt}
+           ],
            # 禁用流式输出,等待完整响应
+           stream=False
+       )
        # 返回模型响应的内容,并去除首尾空白字符
+       return response.choices[0].message.content.strip()
    # 捕获并处理可能出现的异常
+   except Exception as e:
        # 记录错误日志,包含具体的错误信息
+       logger.error(f'大模型分析失败: {e}')
        # 返回错误信息给调用方
+       return f'分析失败: {e}'
+
# 定义POST请求路由,处理简历文件上传
@app.route('/job/upload', methods=['POST'])
def upload_resume():
    # 检查请求中是否包含名为'resume'的文件
    if 'resume' not in request.files:
        return jsonify({'success': False, 'error': '未选择文件'})
    # 获取上传的文件对象
    file = request.files['resume']
    # 检查文件名是否为空,表示用户没有选择文件
    if file.filename == '':
        return jsonify({'success': False, 'error': '未选择文件'})
    # 提取文件扩展名,转换为小写并去除点号
    ext = os.path.splitext(file.filename)[1].lower().lstrip('.')
    # 检查文件类型是否在允许的格式列表中
    if not allowed_file(ext):
        return jsonify({'success': False, 'error': '仅支持pdf和docx文件'})
    # 生成唯一的文件名,使用UUID避免文件名冲突
    filename = f"{uuid.uuid4().hex}.{ext}"
    # 构建完整的文件保存路径
    save_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
    # 将文件保存到指定路径
    file.save(save_path)
    # 开始异常处理,确保文件操作的安全性
    try:
        # 调用文本提取函数,从上传的文件中提取文本内容
+       resume_text = extract_resume_text(save_path, ext)
        # 使用数据库会话上下文管理器,自动管理数据库连接
+       with get_db_session() as session:
            # 查询数据库中的职位信息,限制只获取1条记录
+           jobs = session.query(Job).limit(1).all()
            # 初始化结果列表,用于存储分析结果
+           results = []
            # 遍历每个职位记录
+           for job in jobs:
+               # 获取职位描述,如果为空则使用空字符串
+               job_desc = job.description or ''
+               # 调用大模型分析函数,分析简历与职位的匹配度
+               analysis = analyze_with_llm(job_desc, resume_text)
+               # 记录分析结果的日志信息
+               logger.info(f'分析结果: {analysis}')
+               # 将分析结果添加到结果列表中
+               results.append({
+                   'job_id': job.id,
+                   'job_name': job.job_name,
+                   'company': job.company,
+                   'analysis': analysis
+               })
        # 返回成功响应,包含分析结果
+       return jsonify({'success': True, 'results': results})
    # 捕获并处理可能出现的异常
    except Exception as e:
        # 记录错误日志,包含具体的错误信息
+       logger.error(f'简历解析或分析失败: {e}')
        # 返回错误响应,包含错误信息
        return jsonify({'success': False, 'error': str(e)})

# 定义GET请求路由,获取所有职位信息
@app.route('/api/jobs', methods=['GET'])
def get_jobs():
    try:
        # 使用数据库会话上下文管理器,自动管理数据库连接
        with get_db_session() as session:
            # 查询所有职位记录
            jobs = session.query(Job).all()
            # 创建空列表用于存储职位数据
            jobs_list = []
            # 遍历每个职位对象,转换为字典格式
            for job in jobs:
                jobs_list.append({
                    'id': job.id,
                    'job_name': job.job_name,
                    'company': job.company,
                    'city': job.city,
                    'experience': job.experience,
                    'education': job.education,
                    'description': job.description
                })
            # 记录获取职位信息的日志
            logger.info(f"成功获取 {len(jobs_list)} 个职位信息")
            # 返回JSON格式的职位数据
            return Response(
                json.dumps(jobs_list, ensure_ascii=False),
                mimetype='application/json'
            )
    except Exception as e:
        # 捕获异常并记录错误日志
        logger.error(f"获取职位列表失败: {e}")
        # 返回错误响应,状态码500
        return jsonify({
            'success': False,
            'error': str(e)
        }), 500

# 判断是否为主程序入口
if __name__ == '__main__':
    # 记录服务启动日志
    logger.info("启动Flask服务...")
    # 启动Flask应用,监听所有IP的5000端口,开启调试模式
    app.run(host='0.0.0.0', port=5000, debug=True)

12.markdown #

12.1. job.html #

templates/job.html

<!DOCTYPE html>
<html lang="zh-CN">

<head>
  <meta charset="UTF-8">
  <title>简历上传</title>
+ <script src="https://cdn.jsdelivr.net/npm/marked/marked.min.js"></script>
</head>

<body>
  <h2>上传简历(仅支持PDF和Word)</h2>
  <form id="uploadForm" enctype="multipart/form-data">
    <input type="file" name="resume" accept=".pdf,.docx" required />
    <button type="submit">上传</button>
  </form>
  <h3>简历内容:</h3>
  <div id="analysisText"></div>
  <script>
    document.getElementById('uploadForm').onsubmit = async function (e) {
      e.preventDefault();
      const formData = new FormData(this);
      const res = await fetch('/job/upload', {
        method: 'POST',
        body: formData
      });
      const data = await res.json();
      if (data.success) {
        const results = data.results;
        const result = results.map(item => `
          <div>
            <h3>${item.job_name}</h3>
            <p>${item.company}</p>
+           <div>${marked.parse(item.analysis)}</div>
          </div>
        `).join('');
        document.getElementById('analysisText').innerHTML = result;
      } else {
        document.getElementById('analysisText').textContent = '分析失败:' + data.error;
      }
    }
  </script>
</body>

</html>

13.美化输出 #

13.1. job.html #

templates/job.html

<!DOCTYPE html>
<html lang="zh-CN">

<head>
  <meta charset="UTF-8">
  <title>简历上传</title>
+ <link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/github-markdown-css@5.5.1/github-markdown.min.css">
  <script src="https://cdn.jsdelivr.net/npm/marked/marked.min.js"></script>
</head>

<body>
  <h2>上传简历(仅支持PDF和Word)</h2>
  <form id="uploadForm" enctype="multipart/form-data">
    <input type="file" name="resume" accept=".pdf,.docx" required />
    <button type="submit">上传</button>
  </form>
  <h3>简历内容:</h3>
  <div id="analysisText"></div>
  <script>
    document.getElementById('uploadForm').onsubmit = async function (e) {
      e.preventDefault();
      const formData = new FormData(this);
      const res = await fetch('/job/upload', {
        method: 'POST',
        body: formData
      });
      const data = await res.json();
      if (data.success) {
        const results = data.results;
        const result = results.map(item => `
          <div>
            <h3>${item.job_name}</h3>
            <p>${item.company}</p>
+           <div class="analysis-content markdown-body">${marked.parse(item.analysis)}</div>
          </div>
        `).join('');
        document.getElementById('analysisText').innerHTML = result;
      } else {
        document.getElementById('analysisText').textContent = '分析失败:' + data.error;
      }
    }
  </script>
</body>

</html>

14.流式输出 #

14.1. job.html #

templates/job.html

<!DOCTYPE html>
<html lang="zh-CN">

<head>
  <meta charset="UTF-8">
  <title>简历上传</title>
  <link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/github-markdown-css@5.5.1/github-markdown.min.css">
  <script src="https://cdn.jsdelivr.net/npm/marked/marked.min.js"></script>
</head>

<body>
  <h2>上传简历(仅支持PDF和Word)</h2>
  <form id="uploadForm" enctype="multipart/form-data">
    <input type="file" name="resume" accept=".pdf,.docx" required />
    <button type="submit">上传</button>
  </form>
  <h3>简历内容:</h3>
+ <div id="analysisText" style="padding: 50px;"></div>
  <script>
+   // 定义滚动到底部的函数
    function scrollToBottom() {
+     // 获取分析文本容器元素
      const analysisDiv = document.getElementById('analysisText');
+     // 将容器的滚动位置设置到底部
      analysisDiv.scrollTop = analysisDiv.scrollHeight;
+     // 平滑滚动整个页面到底部
      window.scrollTo({ top: document.body.scrollHeight, behavior: 'smooth' });
    }
+
+   // 为上传表单添加提交事件监听器
    document.getElementById('uploadForm').onsubmit = async function (e) {
+     // 阻止表单默认提交行为
      e.preventDefault();
+     // 创建FormData对象来收集表单数据
      const formData = new FormData(this);
+     // 清空分析文本容器
      document.getElementById('analysisText').innerHTML = '';
+     // 发送POST请求到流式上传接口
      const res = await fetch('/job/upload/stream', {
        method: 'POST',
        body: formData
      });
+     // 获取响应体的读取器
      const reader = res.body.getReader();
+     // 创建文本解码器
      const decoder = new TextDecoder();
+     // 存储每个职位分析内容的映射对象
      let jobMap = {};
+     // 当前正在处理的职位ID
      let currentJobId = null;
+     // 当前正在处理的职位div元素
      let currentJobDiv = null;
+
+     // 循环读取流式数据
      while (true) {
+       // 读取下一个数据块
        const { done, value } = await reader.read();
+       // 如果读取完成则退出循环
        if (done) break;
+       // 解码数据块为文本
        const chunk = decoder.decode(value);
+       // 按行分割数据
        const lines = chunk.split('\n');
+
+       // 遍历每一行数据
        for (const line of lines) {
+         // 检查是否是数据行(以'data: '开头)
          if (line.startsWith('data: ')) {
+           // 解析JSON数据
            const data = JSON.parse(line.slice(6));
+
+           // 处理开始分析的消息
            if (data.type === 'start') {
+             // 设置当前职位ID
              currentJobId = data.job_id;
+             // 创建新的职位div元素
              currentJobDiv = document.createElement('div');
+             // 设置职位div的HTML内容
              currentJobDiv.innerHTML = `
                <h3>${data.job_name}</h3>
                <p>${data.company}</p>
                <div class="analysis-content markdown-body" id="analysis-${currentJobId}"></div>
              `;
+             // 将职位div添加到分析文本容器中
              document.getElementById('analysisText').appendChild(currentJobDiv);
+             // 初始化该职位的分析内容
              jobMap[currentJobId] = '';
+             // 滚动到底部
              scrollToBottom();
+           }
+           // 处理内容更新的消息
+           else if (data.type === 'content') {
+             // 累加该职位的分析内容
              jobMap[data.job_id] += data.content;
+             // 将Markdown内容转换为HTML并显示
              document.getElementById(`analysis-${data.job_id}`).innerHTML = marked.parse(jobMap[data.job_id]);
+             // 滚动到底部
              scrollToBottom();
+           }
+           // 处理错误消息
+           else if (data.type === 'error') {
+             // 在分析文本容器中添加错误信息
              document.getElementById('analysisText').innerHTML += `<div style="color:red;">分析失败:${data.error}</div>`;
+             // 滚动到底部
              scrollToBottom();
            }
          }
        }
      }
    }
  </script>
</body>

</html>

14.2. server.py #

server.py

# 导入uuid模块,用于生成唯一文件名
import uuid
# 从Flask框架导入Flask、jsonify、request、render_template
from flask import Flask, jsonify, request, render_template, stream_with_context
# 从database模块导入数据库会话管理器和Job模型
from database import get_db_session, Job
# 导入json模块,用于处理JSON数据
import json
# 从Flask框架导入Response类,用于自定义HTTP响应
from flask import Response
# 导入logging模块,用于日志记录
import logging
# 导入os模块,用于文件和路径操作
import os
# 导入Document类,用于解析docx文档
from docx import Document
# 导入pdfplumber库,用于解析pdf文档
import pdfplumber
# 导入openai库,用于调用OpenAI大模型
from openai import OpenAI

# 配置日志输出格式和日志级别
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
# 获取当前模块的logger对象
logger = logging.getLogger(__name__)

# 创建Flask应用实例
app = Flask(__name__)
# 设置上传文件保存目录
app.config['UPLOAD_FOLDER'] = 'uploads'
# 设置最大上传文件大小为10MB
app.config['MAX_CONTENT_LENGTH'] = 10 * 1024 * 1024
# 允许上传的文件扩展名集合
ALLOWED_EXTENSIONS = {'pdf', 'docx'}

# 如果上传目录不存在则创建
if not os.path.exists(app.config['UPLOAD_FOLDER']):
    os.makedirs(app.config['UPLOAD_FOLDER'])

# 判断文件扩展名是否允许上传
def allowed_file(ext):
    return ext in ALLOWED_EXTENSIONS

# 定义GET请求路由,返回职位上传页面
@app.route('/job', methods=['GET'])
def job_upload_page():
    return render_template('job.html')

# 创建OpenAI客户端实例,配置API密钥和基础URL
client = OpenAI(
    api_key=os.getenv('DEEPSEEK_API_KEY', ''),
    base_url="https://api.deepseek.com/v1" 
)

# 定义大模型分析提示词模板,用于简历与职位匹配度分析
PROMPT_TEMPLATE = '''你是一个AI前端招聘的技术专家
请根据下面职位描述和简历的内容,给出是否匹配的明确结论,匹配或者不匹配
职位信息如下:{job_desc}
简历内容如下:{resume_text}
请按以下格式输出分析结果:
1. 是否符合职位要求(是/否),并简要说明原因
2. 技术能力分析:
  - 已具备的技术点
  - 欠缺的技术点
3. 改进建议:
  - 具体的学习路径
  - 推荐的学习资源
  - 预计达到要求的时间周期
注意:请只关注技术能力方面的匹配度,忽略城市、学历等其他因素'''

# 定义文件内容提取函数,接收文件路径和扩展名参数
def extract_resume_text(save_path, ext):
    # 初始化文本变量为空字符串
    text = ''
    # 判断文件扩展名是否为PDF格式
    if ext == 'pdf':
        # 使用pdfplumber库打开PDF文件
        with pdfplumber.open(save_path) as pdf:
            # 遍历PDF的每一页,提取文本内容并用换行符连接
            text = "\n".join(page.extract_text() or '' for page in pdf.pages)
    # 判断文件扩展名是否为DOCX格式
    elif ext == 'docx':
        # 使用python-docx库打开Word文档
        doc = Document(save_path)
        # 提取文档中所有段落的文本内容到列表中
        text_list = [para.text for para in doc.paragraphs]
        # 将段落文本用换行符连接成完整文本
        text = "\n".join(text_list)
    # 如果文件格式既不是PDF也不是DOCX
    else:
        # 抛出值错误异常,提示文件类型不支持
        raise ValueError('文件类型不支持')
    # 返回去除首尾空白字符的文本内容
    return text.strip()

# 定义大模型分析函数,接收职位描述和简历文本作为参数
def analyze_with_llm(job_desc, resume_text):
    # 使用提示词模板格式化职位描述和简历文本
    prompt = PROMPT_TEMPLATE.format(job_desc=job_desc, resume_text=resume_text)
    # 尝试调用大模型API进行分析
    try:
        # 创建聊天完成请求,使用deepseek-chat模型
        response = client.chat.completions.create(
            # 指定使用的模型名称
            model="deepseek-chat",
            # 设置消息格式,包含用户角色和提示内容
            messages=[
                {"role": "system", "content": "你是一个"+os.getenv('SEARCH_KEYWORD','AI前端')+"招聘的技术专家"},
                {"role": "user", "content": prompt}
            ],
            # 禁用流式输出,等待完整响应
            stream=False
        )
        # 返回模型响应的内容,并去除首尾空白字符
        return response.choices[0].message.content.strip()
    # 捕获并处理可能出现的异常
    except Exception as e:
        # 记录错误日志,包含具体的错误信息
        logger.error(f'大模型分析失败: {e}')
        # 返回错误信息给调用方
        return f'分析失败: {e}'

# 定义POST请求路由,处理简历文件上传
@app.route('/job/upload', methods=['POST'])
def upload_resume():
    # 检查请求中是否包含名为'resume'的文件
    if 'resume' not in request.files:
        return jsonify({'success': False, 'error': '未选择文件'})
    # 获取上传的文件对象
    file = request.files['resume']
    # 检查文件名是否为空,表示用户没有选择文件
    if file.filename == '':
        return jsonify({'success': False, 'error': '未选择文件'})
    # 提取文件扩展名,转换为小写并去除点号
    ext = os.path.splitext(file.filename)[1].lower().lstrip('.')
    # 检查文件类型是否在允许的格式列表中
    if not allowed_file(ext):
        return jsonify({'success': False, 'error': '仅支持pdf和docx文件'})
    # 生成唯一的文件名,使用UUID避免文件名冲突
    filename = f"{uuid.uuid4().hex}.{ext}"
    # 构建完整的文件保存路径
    save_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
    # 将文件保存到指定路径
    file.save(save_path)
    # 开始异常处理,确保文件操作的安全性
    try:
        # 调用文本提取函数,从上传的文件中提取文本内容
        resume_text = extract_resume_text(save_path, ext)
        # 使用数据库会话上下文管理器,自动管理数据库连接
        with get_db_session() as session:
            # 查询数据库中的职位信息,限制只获取1条记录
            jobs = session.query(Job).limit(1).all()
            # 初始化结果列表,用于存储分析结果
            results = []
            # 遍历每个职位记录
            for job in jobs:
                # 获取职位描述,如果为空则使用空字符串
                job_desc = job.description or ''
                # 调用大模型分析函数,分析简历与职位的匹配度
                analysis = analyze_with_llm(job_desc, resume_text)
                # 记录分析结果的日志信息
                logger.info(f'分析结果: {analysis}')
                # 将分析结果添加到结果列表中
                results.append({
                    'job_id': job.id,
                    'job_name': job.job_name,
                    'company': job.company,
                    'analysis': analysis
                })
        # 返回成功响应,包含分析结果
        return jsonify({'success': True, 'results': results})
    # 捕获并处理可能出现的异常
    except Exception as e:
        # 记录错误日志,包含具体的错误信息
        logger.error(f'简历解析或分析失败: {e}')
        # 返回错误响应,包含错误信息
        return jsonify({'success': False, 'error': str(e)})

# 定义GET请求路由,获取所有职位信息
@app.route('/api/jobs', methods=['GET'])
def get_jobs():
    try:
        # 使用数据库会话上下文管理器,自动管理数据库连接
        with get_db_session() as session:
            # 查询所有职位记录
            jobs = session.query(Job).all()
            # 创建空列表用于存储职位数据
            jobs_list = []
            # 遍历每个职位对象,转换为字典格式
            for job in jobs:
                jobs_list.append({
                    'id': job.id,
                    'job_name': job.job_name,
                    'company': job.company,
                    'city': job.city,
                    'experience': job.experience,
                    'education': job.education,
                    'description': job.description
                })
            # 记录获取职位信息的日志
            logger.info(f"成功获取 {len(jobs_list)} 个职位信息")
            # 返回JSON格式的职位数据
            return Response(
                json.dumps(jobs_list, ensure_ascii=False),
                mimetype='application/json'
            )
    except Exception as e:
        # 捕获异常并记录错误日志
        logger.error(f"获取职位列表失败: {e}")
        # 返回错误响应,状态码500
        return jsonify({
            'success': False,
            'error': str(e)
        }), 500

+# 定义流式上传简历的路由,处理POST请求
+@app.route('/job/upload/stream', methods=['POST'])
+def upload_resume_stream():
+    # 检查请求中是否包含简历文件
+    if 'resume' not in request.files:
+        return jsonify({'success': False, 'error': '未选择文件'})
+   # 获取上传的文件对象
+   file = request.files['resume']
+   # 检查文件名是否为空
+   if file.filename == '':
+       return jsonify({'success': False, 'error': '未选择文件'})
+   # 提取文件扩展名并转换为小写,去除点号
+   ext = os.path.splitext(file.filename)[1].lower().lstrip('.')
+   # 验证文件类型是否允许
+   if not allowed_file(ext):
+       return jsonify({'success': False, 'error': '仅支持pdf和docx文件'})
+   # 生成唯一的文件名
+   filename = f"{uuid.uuid4().hex}.{ext}"
+   # 构建完整的文件保存路径
+   save_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
+   # 保存上传的文件到指定路径
+   file.save(save_path)

+   # 定义生成器函数,用于流式输出分析结果
+   def generate():
+       try:
+           # 提取简历文本内容
+           resume_text = extract_resume_text(save_path, ext)
+           # 使用数据库会话上下文管理器
+           with get_db_session() as session:
+               # 查询所有职位信息
+               jobs = session.query(Job).all()
+               # 遍历每个职位进行分析
+               for job in jobs:
+                   # 获取职位描述,如果为空则使用空字符串
+                   job_desc = job.description or ''
+                   # 格式化提示词模板
+                   prompt = PROMPT_TEMPLATE.format(job_desc=job_desc, resume_text=resume_text)
+                   try:
+                       # 调用AI接口进行流式分析
+                       response = client.chat.completions.create(
+                           model="deepseek-chat",
+                           messages=[
+                               {"role": "system", "content": "你是一个"+os.getenv('SEARCH_KEYWORD','AI前端')+"招聘的技术专家"},
+                               {"role": "user", "content": prompt}
+                           ],
+                           stream=True
+                       )
+                       # 发送开始分析的数据流
+                       yield f'data: ' + json.dumps({"job_id": job.id, "job_name": job.job_name, "company": job.company, "type": "start"}, ensure_ascii=False) + '\n\n'
+                       # 遍历AI响应的每个数据块
+                       for chunk in response:
+                           # 检查是否有内容输出
+                           if chunk.choices[0].delta.content:
+                               # 获取内容片段
+                               content = chunk.choices[0].delta.content
+                                # 发送内容数据流
+                               yield f'data: ' + json.dumps({"job_id": job.id, "type": "content", "content": content}, ensure_ascii=False) + '\n\n'
+                       # 发送分析完成的数据流
+                       yield f'data: ' + json.dumps({"job_id": job.id, "type": "end"}, ensure_ascii=False) + '\n\n'
+                   except Exception as e:
+                       # 发送错误信息的数据流
+                       yield f'data: ' + json.dumps({"job_id": job.id, "type": "error", "error": str(e)}, ensure_ascii=False) + '\n\n'
+              # 发送所有分析完成的数据流
+               yield f'data: ' + json.dumps({"type": "all_complete"}, ensure_ascii=False) + '\n\n'
+       except Exception as e:
+           # 发送整体错误信息的数据流
+           yield f'data: ' + json.dumps({"type": "error", "error": str(e)}, ensure_ascii=False) + '\n\n'

+   # 返回流式响应,设置MIME类型为事件流
+   return app.response_class(stream_with_context(generate()), mimetype='text/event-stream')

# 判断是否为主程序入口
if __name__ == '__main__':
    # 记录服务启动日志
    logger.info("启动Flask服务...")
    # 启动Flask应用,监听所有IP的5000端口,开启调试模式
    app.run(host='0.0.0.0', port=5000, debug=True)

访问验证

请输入访问令牌

Token不正确,请重新输入