硅片爬虫代码(仅个人用于学习记录)
import csv
import pymysql # 替换mysql-connector,兼容性更好
from pymysql import OperationalError
from datetime import datetime
from html import unescape
import requests
from bs4 import BeautifulSoup
import logging
全局配置
target_items = [
“多晶硅 致密料 (RMB)”,
“多晶硅 颗粒料 (RMB)”,
“单晶N型硅片 - 182-183.75mm / 130µm (RMB)”,
“单晶N型硅片 - 182210mm / 130µm (RMB)”,
“182182-210mm 单晶TOPCon组件 (RMB)”
]
target_url = “https://www.infolink-group.com/spot-price/cn/“
MySQL配置(PyMySQL无需ssl参数,默认禁用SSL)
mysql_config = {
“host”: “127.0.0.1”, # 本地TCP连接
“port”: 3306, # 与监听端口一致
“user”: “root”, # 数据库用户名
“password”: “a123456”, # 替换为你的密码
“database”: “silicon_price_db”, # 数据库名
“charset”: “utf8mb4”,
“connect_timeout”: 5 # 超时5秒,快速反馈
}
headers = {
“User-Agent”: “Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36”
} # 模拟浏览器请求
MySQL核心工具函数
def create_db_connection():
“””创建MySQL连接(使用PyMySQL,适配MySQL 8.0+)”””
connection = None
try:
print(“\n【MySQL连接步骤1/3】尝试用PyMySQL登录本地MySQL…”)
print(f” 连接参数:host={mysql_config[‘host’]}, port={mysql_config[‘port’]}, user={mysql_config[‘user’]}”)
print(“ ⏳ 等待响应(超时5秒)…”)
# 步骤1:先登录MySQL(不指定数据库,用于创建库)
temp_conn = pymysql.connect(
host=mysql_config["host"],
port=mysql_config["port"],
user=mysql_config["user"],
password=mysql_config["password"],
charset=mysql_config["charset"],
connect_timeout=mysql_config["connect_timeout"]
)
print("✅ PyMySQL登录成功(未指定数据库)")
# 步骤2:创建目标数据库(若不存在)
print(f"【MySQL连接步骤2/3】创建数据库 {mysql_config['database']}...")
with temp_conn.cursor() as cursor:
cursor.execute(f"CREATE DATABASE IF NOT EXISTS {mysql_config['database']} CHARACTER SET utf8mb4;")
temp_conn.commit()
temp_conn.close() # 关闭临时连接
print(f"✅ 数据库 {mysql_config['database']} 已就绪")
# 步骤3:连接到目标数据库
print(f"【MySQL连接步骤3/3】连接到数据库 {mysql_config['database']}...")
connection = pymysql.connect(** mysql_config)
print("🎉 MySQL全流程连接成功!")
except OperationalError as e:
error_code, error_msg = e.args
print(f"\n❌ MySQL连接失败(错误码:{error_code}):{error_msg}")
# 针对常见错误的解决方案
if error_code == 1045:
print("💡 可能原因:密码错误或用户无TCP登录权限")
print(" - 确认密码正确;")
print(" - 登录MySQL执行:ALTER USER 'root'@'127.0.0.1' IDENTIFIED WITH mysql_native_password BY '你的密码';")
elif error_code == 2003:
print("💡 可能原因:MySQL服务未启动或端口被拦截")
else:
print("💡 参考解决方案:")
print(" - 确保MySQL已重启(修改配置后需重启);")
print(" - 检查MySQL错误日志(data目录下的.err文件)")
except Exception as e:
print(f"\n❌ 连接过程异常:{str(e)}(可能是PyMySQL库未安装)")
print(" 解决:执行 pip install pymysql 安装库")
return connection
创建数据表函数
def create_price_table(connection):
“””创建数据表(确保结构正确)”””
if not connection:
print(“❌ 跳过建表:MySQL连接未建立”)
return False
create_sql = """
CREATE TABLE IF NOT EXISTS silicon_prices (
id INT AUTO_INCREMENT PRIMARY KEY,
extract_time DATE NOT NULL,
item_name VARCHAR(255) NOT NULL,
high_price DECIMAL(10,3),
low_price DECIMAL(10,3),
avg_price DECIMAL(10,3),
size_192 DECIMAL(10,3),
size_193 DECIMAL(10,3),
size_197 DECIMAL(10,3),
UNIQUE KEY unique_record (extract_time, item_name) # 避免重复插入
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
"""
try:
with connection.cursor() as cursor:
cursor.execute(create_sql)
connection.commit()
print("✅ 数据表silicon_prices已创建/验证")
return True
except Exception as e:
print(f"❌ 建表失败:{str(e)}")
return False
数据插入函数
def insert_price_data(connection, results):
“””插入数据(处理无效值和重复数据)”””
if not connection:
print(“❌ 跳过插入:MySQL连接未建立”)
return
# 筛选有效数据(均价非空)
valid_count = sum(1 for res in results if res["均价"] not in ["未找到", "--", ""])
if valid_count == 0:
print("⚠️ 无有效数据可插入(检查网页提取是否正常)")
return
insert_sql = """
INSERT IGNORE INTO silicon_prices (
extract_time, item_name, high_price, low_price, avg_price,
size_192, size_193, size_197
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
"""
# 处理数据格式(转换为数值或None)
data_list = []
for res in results:
def to_float(val):
if val in ["未找到", "--", "", None]:
return None
try:
return float(val)
except:
return None
data_list.append((
res["提取时间"],
res["项目名称"],
to_float(res["高点"]),
to_float(res["低点"]),
to_float(res["均价"]),
to_float(res["192"]),
to_float(res["193"]),
to_float(res["197"])
))
try:
with connection.cursor() as cursor:
cursor.executemany(insert_sql, data_list)
connection.commit()
print(f"✅ 数据插入完成,新增{cursor.rowcount}条(0表示重复)")
except Exception as e:
connection.rollback()
print(f"❌ 插入失败:{str(e)}")
网页数据提取函数
def extract_web_data():
print(“\n【网页数据提取】开始…”)
extract_time = datetime.now().strftime(“%Y-%m-%d”)
# 初始化结果列表
results = [
{
“提取时间”: extract_time,
“项目名称”: item,
“高点”: “未找到”,
“低点”: “未找到”,
“均价”: “未找到”,
“192”: “未找到”,
“193”: “未找到”,
“197”: “未找到”
} for item in target_items
]
try:
# 请求网页(超时5秒)
response = requests.get(target_url, headers=headers, timeout=5)
response.raise_for_status() # 触发HTTP错误(如404)
# 保存源码(调试用)
with open("网页源码.html", "w", encoding="utf-8") as f:
f.write(response.text)
print("✅ 网页源码已保存(调试用)")
# 解析HTML
soup = BeautifulSoup(response.text, "html.parser")
target_divs = soup.find_all("div", class_="tb-wrap tb02")
if not target_divs:
print("❌ 未找到目标表格(div.tb-wrap.tb02)")
return results
# 提取数据
matched_count = 0
for div in target_divs:
table = div.find("table")
if not table:
continue
# 遍历数据行(排除表头)
for row in [r for r in table.find_all("tr") if not r.find("th")]:
tds = row.find_all("td")
if len(tds) < 4:
continue # 确保有足够的列
# 提取项目名称
item_name = unescape(tds[0].get_text(strip=True))
# 匹配目标项目
for res in results:
if res["项目名称"] == item_name:
# 提取高点、低点、均价
res["高点"] = tds[1].get_text(strip=True)
res["低点"] = tds[2].get_text(strip=True)
avg_str = tds[3].get_text(strip=True)
res["均价"] = avg_str
matched_count += 1
print(f" ✅ 匹配到:{item_name} → 均价:{avg_str}")
# 计算192/193/197尺寸价格
if avg_str not in ["未找到", "--", ""]:
try:
avg = float(avg_str)
res["192"] = round(avg / 183.75 * 192.6+0.03, 3)
res["193"] = round(avg / 182.2 * 183 / 187.75 * 193.6+0.09, 3)
res["197"] = round(avg / 182.2 * 183 / 187.75 * 197.3+0.09, 3)
except:
print(f" ⚠️ 计算失败:{item_name}的均价不是有效数字")
break # 匹配后退出循环
print(f"【网页数据提取】完成,匹配到{matched_count}/{len(target_items)}个项目")
except requests.exceptions.RequestException as e:
print(f"【网页数据提取】失败:{str(e)}(网络错误)")
except Exception as e:
print(f"【网页数据提取】失败:{str(e)}(解析错误)")
"""
# 保存CSV备份(无论MySQL是否成功)
csv_filename = f"硅片价格汇总_{extract_time}.csv"
with open(csv_filename, "w", encoding="utf-8-sig", newline="") as f:
writer = csv.DictWriter(f, fieldnames=results[0].keys())
writer.writeheader()
writer.writerows(results)
print(f"【网页数据提取】CSV已保存:{csv_filename}")
"""
return results
主函数(全流程执行)
def main():
print(“=”*60)
print(f”程序启动:{datetime.now().strftime(‘%Y-%m-%d %H:%M:%S’)}”)
print(“=”*60)
# 步骤1:提取网页数据
results = extract_web_data()
# 步骤2:MySQL操作(连接→建表→插入)
print("\n" + "="*60)
print("开始MySQL操作...")
conn = create_db_connection()
if conn:
create_price_table(conn)
insert_price_data(conn, results)
conn.close() # 关闭连接
print("✅ MySQL连接已关闭")
else:
print("❌ MySQL操作终止(连接未建立)")
# 执行总结
print("\n" + "="*60)
print("程序执行完成!")
print("1. 网页数据已保存到CSV(即使MySQL失败)")
print("2. MySQL状态:" + ("成功" if conn else "失败,请查看错误提示"))
print("="*60)
程序入口
if name == “main“:
main()
配置日志(写入绝对路径,确保有权限)
logging.basicConfig(
filename=”C:\task_execution.log”,
level=logging.DEBUG,
format=”%(asctime)s - %(levelname)s - %(message)s”
)
在关键步骤添加日志
logging.info(“开始网页爬取…”)
try:
response = requests.get(target_url, headers=headers, timeout=5)
logging.info(f”网页请求成功,状态码:{response.status_code}”)
except Exception as e:
logging.error(f”网页请求失败:{str(e)}”) # 记录网络错误
logging.info(“开始连接MySQL…”)
try:
conn = create_db_connection()
logging.info(“MySQL连接成功”)
except Exception as e:
logging.error(f”MySQL连接失败:{str(e)}”) # 记录数据库错误
