import csv
import pymysql # 替换mysql-connector,兼容性更好
from pymysql import OperationalError
from datetime import datetime
from html import unescape
import requests
from bs4 import BeautifulSoup
import logging

全局配置

target_items = [
“多晶硅 致密料 (RMB)”,
“多晶硅 颗粒料 (RMB)”,
“单晶N型硅片 - 182-183.75mm / 130µm (RMB)”,
“单晶N型硅片 - 182210mm / 130µm (RMB)”,
“182
182-210mm 单晶TOPCon组件 (RMB)”
]

target_url = “https://www.infolink-group.com/spot-price/cn/

MySQL配置(PyMySQL无需ssl参数,默认禁用SSL)

mysql_config = {
“host”: “127.0.0.1”, # 本地TCP连接
“port”: 3306, # 与监听端口一致
“user”: “root”, # 数据库用户名
“password”: “a123456”, # 替换为你的密码
“database”: “silicon_price_db”, # 数据库名
“charset”: “utf8mb4”,
“connect_timeout”: 5 # 超时5秒,快速反馈
}

headers = {
“User-Agent”: “Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36”
} # 模拟浏览器请求

MySQL核心工具函数

def create_db_connection():
“””创建MySQL连接(使用PyMySQL,适配MySQL 8.0+)”””
connection = None
try:
print(“\n【MySQL连接步骤1/3】尝试用PyMySQL登录本地MySQL…”)
print(f” 连接参数:host={mysql_config[‘host’]}, port={mysql_config[‘port’]}, user={mysql_config[‘user’]}”)
print(“ ⏳ 等待响应(超时5秒)…”)

    # 步骤1:先登录MySQL(不指定数据库,用于创建库)
    temp_conn = pymysql.connect(
        host=mysql_config["host"],
        port=mysql_config["port"],
        user=mysql_config["user"],
        password=mysql_config["password"],
        charset=mysql_config["charset"],
        connect_timeout=mysql_config["connect_timeout"]
    )
    print("✅ PyMySQL登录成功(未指定数据库)")

    # 步骤2:创建目标数据库(若不存在)
    print(f"【MySQL连接步骤2/3】创建数据库 {mysql_config['database']}...")
    with temp_conn.cursor() as cursor:
        cursor.execute(f"CREATE DATABASE IF NOT EXISTS {mysql_config['database']} CHARACTER SET utf8mb4;")
    temp_conn.commit()
    temp_conn.close()  # 关闭临时连接
    print(f"✅ 数据库 {mysql_config['database']} 已就绪")

    # 步骤3:连接到目标数据库
    print(f"【MySQL连接步骤3/3】连接到数据库 {mysql_config['database']}...")
    connection = pymysql.connect(** mysql_config)
    print("🎉 MySQL全流程连接成功!")

except OperationalError as e:
    error_code, error_msg = e.args
    print(f"\n❌ MySQL连接失败(错误码:{error_code}):{error_msg}")
    # 针对常见错误的解决方案
    if error_code == 1045:
        print("💡 可能原因:密码错误或用户无TCP登录权限")
        print("  - 确认密码正确;")
        print("  - 登录MySQL执行:ALTER USER 'root'@'127.0.0.1' IDENTIFIED WITH mysql_native_password BY '你的密码';")
    elif error_code == 2003:
        print("💡 可能原因:MySQL服务未启动或端口被拦截")
    else:
        print("💡 参考解决方案:")
        print("  - 确保MySQL已重启(修改配置后需重启);")
        print("  - 检查MySQL错误日志(data目录下的.err文件)")

except Exception as e:
    print(f"\n❌ 连接过程异常:{str(e)}(可能是PyMySQL库未安装)")
    print("  解决:执行 pip install pymysql 安装库")

return connection

创建数据表函数

def create_price_table(connection):
“””创建数据表(确保结构正确)”””
if not connection:
print(“❌ 跳过建表:MySQL连接未建立”)
return False

create_sql = """
CREATE TABLE IF NOT EXISTS silicon_prices (
    id INT AUTO_INCREMENT PRIMARY KEY,
    extract_time DATE NOT NULL,
    item_name VARCHAR(255) NOT NULL,
    high_price DECIMAL(10,3),
    low_price DECIMAL(10,3),
    avg_price DECIMAL(10,3),
    size_192 DECIMAL(10,3),
    size_193 DECIMAL(10,3),
    size_197 DECIMAL(10,3),
    UNIQUE KEY unique_record (extract_time, item_name)  # 避免重复插入
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
"""

try:
    with connection.cursor() as cursor:
        cursor.execute(create_sql)
    connection.commit()
    print("✅ 数据表silicon_prices已创建/验证")
    return True
except Exception as e:
    print(f"❌ 建表失败:{str(e)}")
    return False

数据插入函数

def insert_price_data(connection, results):
“””插入数据(处理无效值和重复数据)”””
if not connection:
print(“❌ 跳过插入:MySQL连接未建立”)
return

# 筛选有效数据(均价非空)
valid_count = sum(1 for res in results if res["均价"] not in ["未找到", "--", ""])
if valid_count == 0:
    print("⚠️ 无有效数据可插入(检查网页提取是否正常)")
    return

insert_sql = """
INSERT IGNORE INTO silicon_prices (
    extract_time, item_name, high_price, low_price, avg_price,
    size_192, size_193, size_197
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
"""

# 处理数据格式(转换为数值或None)
data_list = []
for res in results:
    def to_float(val):
        if val in ["未找到", "--", "", None]:
            return None
        try:
            return float(val)
        except:
            return None

    data_list.append((
        res["提取时间"],
        res["项目名称"],
        to_float(res["高点"]),
        to_float(res["低点"]),
        to_float(res["均价"]),
        to_float(res["192"]),
        to_float(res["193"]),
        to_float(res["197"])
    ))

try:
    with connection.cursor() as cursor:
        cursor.executemany(insert_sql, data_list)
    connection.commit()
    print(f"✅ 数据插入完成,新增{cursor.rowcount}条(0表示重复)")
except Exception as e:
    connection.rollback()
    print(f"❌ 插入失败:{str(e)}")

网页数据提取函数

def extract_web_data():
print(“\n【网页数据提取】开始…”)
extract_time = datetime.now().strftime(“%Y-%m-%d”)
# 初始化结果列表
results = [
{
“提取时间”: extract_time,
“项目名称”: item,
“高点”: “未找到”,
“低点”: “未找到”,
“均价”: “未找到”,
“192”: “未找到”,
“193”: “未找到”,
“197”: “未找到”
} for item in target_items
]

try:
    # 请求网页(超时5秒)
    response = requests.get(target_url, headers=headers, timeout=5)
    response.raise_for_status()  # 触发HTTP错误(如404)
    # 保存源码(调试用)
    with open("网页源码.html", "w", encoding="utf-8") as f:
        f.write(response.text)
    print("✅ 网页源码已保存(调试用)")

    # 解析HTML
    soup = BeautifulSoup(response.text, "html.parser")
    target_divs = soup.find_all("div", class_="tb-wrap tb02")
    if not target_divs:
        print("❌ 未找到目标表格(div.tb-wrap.tb02)")
        return results

    # 提取数据
    matched_count = 0
    for div in target_divs:
        table = div.find("table")
        if not table:
            continue
        # 遍历数据行(排除表头)
        for row in [r for r in table.find_all("tr") if not r.find("th")]:
            tds = row.find_all("td")
            if len(tds) < 4:
                continue  # 确保有足够的列
            # 提取项目名称
            item_name = unescape(tds[0].get_text(strip=True))
            # 匹配目标项目
            for res in results:
                if res["项目名称"] == item_name:
                    # 提取高点、低点、均价
                    res["高点"] = tds[1].get_text(strip=True)
                    res["低点"] = tds[2].get_text(strip=True)
                    avg_str = tds[3].get_text(strip=True)
                    res["均价"] = avg_str
                    matched_count += 1
                    print(f"  ✅ 匹配到:{item_name} → 均价:{avg_str}")

                    # 计算192/193/197尺寸价格
                    if avg_str not in ["未找到", "--", ""]:
                        try:
                            avg = float(avg_str)
                            res["192"] = round(avg / 183.75 * 192.6+0.03, 3)
                            res["193"] = round(avg / 182.2 * 183 / 187.75 * 193.6+0.09, 3)
                            res["197"] = round(avg / 182.2 * 183 / 187.75 * 197.3+0.09, 3)
                        except:
                            print(f"    ⚠️ 计算失败:{item_name}的均价不是有效数字")
                    break  # 匹配后退出循环

    print(f"【网页数据提取】完成,匹配到{matched_count}/{len(target_items)}个项目")

except requests.exceptions.RequestException as e:
    print(f"【网页数据提取】失败:{str(e)}(网络错误)")
except Exception as e:
    print(f"【网页数据提取】失败:{str(e)}(解析错误)")
"""
# 保存CSV备份(无论MySQL是否成功)
csv_filename = f"硅片价格汇总_{extract_time}.csv"
with open(csv_filename, "w", encoding="utf-8-sig", newline="") as f:
    writer = csv.DictWriter(f, fieldnames=results[0].keys())
    writer.writeheader()
    writer.writerows(results)
print(f"【网页数据提取】CSV已保存:{csv_filename}")
"""
return results

主函数(全流程执行)

def main():
print(“=”*60)
print(f”程序启动:{datetime.now().strftime(‘%Y-%m-%d %H:%M:%S’)}”)
print(“=”*60)

# 步骤1:提取网页数据
results = extract_web_data()

# 步骤2:MySQL操作(连接→建表→插入)
print("\n" + "="*60)
print("开始MySQL操作...")
conn = create_db_connection()
if conn:
    create_price_table(conn)
    insert_price_data(conn, results)
    conn.close()  # 关闭连接
    print("✅ MySQL连接已关闭")
else:
    print("❌ MySQL操作终止(连接未建立)")

# 执行总结
print("\n" + "="*60)
print("程序执行完成!")
print("1. 网页数据已保存到CSV(即使MySQL失败)")
print("2. MySQL状态:" + ("成功" if conn else "失败,请查看错误提示"))
print("="*60)

程序入口

if name == “main“:
main()

配置日志(写入绝对路径,确保有权限)

logging.basicConfig(
filename=”C:\task_execution.log”,
level=logging.DEBUG,
format=”%(asctime)s - %(levelname)s - %(message)s”
)

在关键步骤添加日志

logging.info(“开始网页爬取…”)
try:
response = requests.get(target_url, headers=headers, timeout=5)
logging.info(f”网页请求成功,状态码:{response.status_code}”)
except Exception as e:
logging.error(f”网页请求失败:{str(e)}”) # 记录网络错误

logging.info(“开始连接MySQL…”)
try:
conn = create_db_connection()
logging.info(“MySQL连接成功”)
except Exception as e:
logging.error(f”MySQL连接失败:{str(e)}”) # 记录数据库错误