Python 硅片价格爬虫

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
import csv
import pymysql # 替换mysql-connector,兼容性更好
from pymysql import OperationalError
from datetime import datetime
from html import unescape
import requests
from bs4 import BeautifulSoup
import logging

# 全局配置
target_items = [
"多晶硅 致密料 (RMB)",
"多晶硅 颗粒料 (RMB)",
"单晶N型硅片 - 182-183.75mm / 130µm (RMB)",
"单晶N型硅片 - 182*210mm / 130µm (RMB)",
"182*182-210mm 单晶TOPCon组件 (RMB)"
]

target_url = "https://www.infolink-group.com/spot-price/cn/"

# MySQL配置(PyMySQL无需ssl参数,默认禁用SSL)
mysql_config = {
"host": "127.0.0.1", # 本地TCP连接
"port": 3306, # 与监听端口一致
"user": "root", # 数据库用户名
"password": "a123456", # 替换为你的密码
"database": "silicon_price_db", # 数据库名
"charset": "utf8mb4",
"connect_timeout": 5 # 超时5秒,快速反馈
}

headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36"
} # 模拟浏览器请求

# MySQL核心工具函数
def create_db_connection():
"""创建MySQL连接(使用PyMySQL,适配MySQL 8.0+)"""
connection = None
try:
print("\n【MySQL连接步骤1/3】尝试用PyMySQL登录本地MySQL...")
print(f" 连接参数:host={mysql_config['host']}, port={mysql_config['port']}, user={mysql_config['user']}")
print(" ⏳ 等待响应(超时5秒)...")

# 步骤1:先登录MySQL(不指定数据库,用于创建库)
temp_conn = pymysql.connect(
host=mysql_config["host"],
port=mysql_config["port"],
user=mysql_config["user"],
password=mysql_config["password"],
charset=mysql_config["charset"],
connect_timeout=mysql_config["connect_timeout"]
)
print("✅ PyMySQL登录成功(未指定数据库)")

# 步骤2:创建目标数据库(若不存在)
print(f"【MySQL连接步骤2/3】创建数据库 {mysql_config['database']}...")
with temp_conn.cursor() as cursor:
cursor.execute(f"CREATE DATABASE IF NOT EXISTS {mysql_config['database']} CHARACTER SET utf8mb4;")
temp_conn.commit()
temp_conn.close() # 关闭临时连接
print(f"✅ 数据库 {mysql_config['database']} 已就绪")

# 步骤3:连接到目标数据库
print(f"【MySQL连接步骤3/3】连接到数据库 {mysql_config['database']}...")
connection = pymysql.connect(** mysql_config)
print("🎉 MySQL全流程连接成功!")

except OperationalError as e:
error_code, error_msg = e.args
print(f"\n❌ MySQL连接失败(错误码:{error_code}):{error_msg}")
# 针对常见错误的解决方案
if error_code == 1045:
print("💡 可能原因:密码错误或用户无TCP登录权限")
print(" - 确认密码正确;")
print(" - 登录MySQL执行:ALTER USER 'root'@'127.0.0.1' IDENTIFIED WITH mysql_native_password BY '你的密码';")
elif error_code == 2003:
print("💡 可能原因:MySQL服务未启动或端口被拦截")
else:
print("💡 参考解决方案:")
print(" - 确保MySQL已重启(修改配置后需重启);")
print(" - 检查MySQL错误日志(data目录下的.err文件)")

except Exception as e:
print(f"\n❌ 连接过程异常:{str(e)}(可能是PyMySQL库未安装)")
print(" 解决:执行 pip install pymysql 安装库")

return connection

# 创建数据表函数
def create_price_table(connection):
"""创建数据表(确保结构正确)"""
if not connection:
print("❌ 跳过建表:MySQL连接未建立")
return False

create_sql = """
CREATE TABLE IF NOT EXISTS silicon_prices (
id INT AUTO_INCREMENT PRIMARY KEY,
extract_time DATE NOT NULL,
item_name VARCHAR(255) NOT NULL,
high_price DECIMAL(10,3),
low_price DECIMAL(10,3),
avg_price DECIMAL(10,3),
size_192 DECIMAL(10,3),
size_193 DECIMAL(10,3),
size_197 DECIMAL(10,3),
UNIQUE KEY unique_record (extract_time, item_name) # 避免重复插入
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
"""

try:
with connection.cursor() as cursor:
cursor.execute(create_sql)
connection.commit()
print("✅ 数据表silicon_prices已创建/验证")
return True
except Exception as e:
print(f"❌ 建表失败:{str(e)}")
return False

# 数据插入函数
def insert_price_data(connection, results):
"""插入数据(处理无效值和重复数据)"""
if not connection:
print("❌ 跳过插入:MySQL连接未建立")
return

# 筛选有效数据(均价非空)
valid_count = sum(1 for res in results if res["均价"] not in ["未找到", "--", ""])
if valid_count == 0:
print("⚠️ 无有效数据可插入(检查网页提取是否正常)")
return

insert_sql = """
INSERT IGNORE INTO silicon_prices (
extract_time, item_name, high_price, low_price, avg_price,
size_192, size_193, size_197
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
"""

# 处理数据格式(转换为数值或None)
data_list = []
for res in results:
def to_float(val):
if val in ["未找到", "--", "", None]:
return None
try:
return float(val)
except:
return None

data_list.append((
res["提取时间"],
res["项目名称"],
to_float(res["高点"]),
to_float(res["低点"]),
to_float(res["均价"]),
to_float(res["192"]),
to_float(res["193"]),
to_float(res["197"])
))

try:
with connection.cursor() as cursor:
cursor.executemany(insert_sql, data_list)
connection.commit()
print(f"✅ 数据插入完成,新增{cursor.rowcount}条(0表示重复)")
except Exception as e:
connection.rollback()
print(f"❌ 插入失败:{str(e)}")


# 网页数据提取函数
def extract_web_data():
print("\n【网页数据提取】开始...")
extract_time = datetime.now().strftime("%Y-%m-%d")
# 初始化结果列表
results = [
{
"提取时间": extract_time,
"项目名称": item,
"高点": "未找到",
"低点": "未找到",
"均价": "未找到",
"192": "未找到",
"193": "未找到",
"197": "未找到"
} for item in target_items
]

try:
# 请求网页(超时5秒)
response = requests.get(target_url, headers=headers, timeout=5)
response.raise_for_status() # 触发HTTP错误(如404)
# 保存源码(调试用)
with open("网页源码.html", "w", encoding="utf-8") as f:
f.write(response.text)
print("✅ 网页源码已保存(调试用)")

# 解析HTML
soup = BeautifulSoup(response.text, "html.parser")
target_divs = soup.find_all("div", class_="tb-wrap tb02")
if not target_divs:
print("❌ 未找到目标表格(div.tb-wrap.tb02)")
return results

# 提取数据
matched_count = 0
for div in target_divs:
table = div.find("table")
if not table:
continue
# 遍历数据行(排除表头)
for row in [r for r in table.find_all("tr") if not r.find("th")]:
tds = row.find_all("td")
if len(tds) < 4:
continue # 确保有足够的列
# 提取项目名称
item_name = unescape(tds[0].get_text(strip=True))
# 匹配目标项目
for res in results:
if res["项目名称"] == item_name:
# 提取高点、低点、均价
res["高点"] = tds[1].get_text(strip=True)
res["低点"] = tds[2].get_text(strip=True)
avg_str = tds[3].get_text(strip=True)
res["均价"] = avg_str
matched_count += 1
print(f" ✅ 匹配到:{item_name} → 均价:{avg_str}")

# 计算192/193/197尺寸价格
if avg_str not in ["未找到", "--", ""]:
try:
avg = float(avg_str)
res["192"] = round(avg / 183.75 * 192.6+0.03, 3)
res["193"] = round(avg / 182.2 * 183 / 187.75 * 193.6+0.09, 3)
res["197"] = round(avg / 182.2 * 183 / 187.75 * 197.3+0.09, 3)
except:
print(f" ⚠️ 计算失败:{item_name}的均价不是有效数字")
break # 匹配后退出循环

print(f"【网页数据提取】完成,匹配到{matched_count}/{len(target_items)}个项目")

except requests.exceptions.RequestException as e:
print(f"【网页数据提取】失败:{str(e)}(网络错误)")
except Exception as e:
print(f"【网页数据提取】失败:{str(e)}(解析错误)")
"""
# 保存CSV备份(无论MySQL是否成功)
csv_filename = f"硅片价格汇总_{extract_time}.csv"
with open(csv_filename, "w", encoding="utf-8-sig", newline="") as f:
writer = csv.DictWriter(f, fieldnames=results[0].keys())
writer.writeheader()
writer.writerows(results)
print(f"【网页数据提取】CSV已保存:{csv_filename}")
"""
return results


# 主函数(全流程执行)
def main():
print("="*60)
print(f"程序启动:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("="*60)

# 步骤1:提取网页数据
results = extract_web_data()

# 步骤2:MySQL操作(连接→建表→插入)
print("\n" + "="*60)
print("开始MySQL操作...")
conn = create_db_connection()
if conn:
create_price_table(conn)
insert_price_data(conn, results)
conn.close() # 关闭连接
print("✅ MySQL连接已关闭")
else:
print("❌ MySQL操作终止(连接未建立)")

# 执行总结
print("\n" + "="*60)
print("程序执行完成!")
print("1. 网页数据已保存到CSV(即使MySQL失败)")
print("2. MySQL状态:" + ("成功" if conn else "失败,请查看错误提示"))
print("="*60)

# 程序入口
if __name__ == "__main__":
main()

# 配置日志(写入绝对路径,确保有权限)
logging.basicConfig(
filename="C:\\task_execution.log",
level=logging.DEBUG,
format="%(asctime)s - %(levelname)s - %(message)s"
)

# 在关键步骤添加日志
logging.info("开始网页爬取...")
try:
response = requests.get(target_url, headers=headers, timeout=5)
logging.info(f"网页请求成功,状态码:{response.status_code}")
except Exception as e:
logging.error(f"网页请求失败:{str(e)}") # 记录网络错误

logging.info("开始连接MySQL...")
try:
conn = create_db_connection()
logging.info("MySQL连接成功")
except Exception as e:
logging.error(f"MySQL连接失败:{str(e)}") # 记录数据库错误