1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313
| import csv import pymysql from pymysql import OperationalError from datetime import datetime from html import unescape import requests from bs4 import BeautifulSoup import logging
target_items = [ "多晶硅 致密料 (RMB)", "多晶硅 颗粒料 (RMB)", "单晶N型硅片 - 182-183.75mm / 130µm (RMB)", "单晶N型硅片 - 182*210mm / 130µm (RMB)", "182*182-210mm 单晶TOPCon组件 (RMB)" ]
target_url = "https://www.infolink-group.com/spot-price/cn/"
mysql_config = { "host": "127.0.0.1", "port": 3306, "user": "root", "password": "a123456", "database": "silicon_price_db", "charset": "utf8mb4", "connect_timeout": 5 }
headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36" }
def create_db_connection(): """创建MySQL连接(使用PyMySQL,适配MySQL 8.0+)""" connection = None try: print("\n【MySQL连接步骤1/3】尝试用PyMySQL登录本地MySQL...") print(f" 连接参数:host={mysql_config['host']}, port={mysql_config['port']}, user={mysql_config['user']}") print(" ⏳ 等待响应(超时5秒)...")
temp_conn = pymysql.connect( host=mysql_config["host"], port=mysql_config["port"], user=mysql_config["user"], password=mysql_config["password"], charset=mysql_config["charset"], connect_timeout=mysql_config["connect_timeout"] ) print("✅ PyMySQL登录成功(未指定数据库)")
print(f"【MySQL连接步骤2/3】创建数据库 {mysql_config['database']}...") with temp_conn.cursor() as cursor: cursor.execute(f"CREATE DATABASE IF NOT EXISTS {mysql_config['database']} CHARACTER SET utf8mb4;") temp_conn.commit() temp_conn.close() print(f"✅ 数据库 {mysql_config['database']} 已就绪")
print(f"【MySQL连接步骤3/3】连接到数据库 {mysql_config['database']}...") connection = pymysql.connect(** mysql_config) print("🎉 MySQL全流程连接成功!")
except OperationalError as e: error_code, error_msg = e.args print(f"\n❌ MySQL连接失败(错误码:{error_code}):{error_msg}") if error_code == 1045: print("💡 可能原因:密码错误或用户无TCP登录权限") print(" - 确认密码正确;") print(" - 登录MySQL执行:ALTER USER 'root'@'127.0.0.1' IDENTIFIED WITH mysql_native_password BY '你的密码';") elif error_code == 2003: print("💡 可能原因:MySQL服务未启动或端口被拦截") else: print("💡 参考解决方案:") print(" - 确保MySQL已重启(修改配置后需重启);") print(" - 检查MySQL错误日志(data目录下的.err文件)")
except Exception as e: print(f"\n❌ 连接过程异常:{str(e)}(可能是PyMySQL库未安装)") print(" 解决:执行 pip install pymysql 安装库")
return connection
def create_price_table(connection): """创建数据表(确保结构正确)""" if not connection: print("❌ 跳过建表:MySQL连接未建立") return False
create_sql = """ CREATE TABLE IF NOT EXISTS silicon_prices ( id INT AUTO_INCREMENT PRIMARY KEY, extract_time DATE NOT NULL, item_name VARCHAR(255) NOT NULL, high_price DECIMAL(10,3), low_price DECIMAL(10,3), avg_price DECIMAL(10,3), size_192 DECIMAL(10,3), size_193 DECIMAL(10,3), size_197 DECIMAL(10,3), UNIQUE KEY unique_record (extract_time, item_name) # 避免重复插入 ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; """
try: with connection.cursor() as cursor: cursor.execute(create_sql) connection.commit() print("✅ 数据表silicon_prices已创建/验证") return True except Exception as e: print(f"❌ 建表失败:{str(e)}") return False
def insert_price_data(connection, results): """插入数据(处理无效值和重复数据)""" if not connection: print("❌ 跳过插入:MySQL连接未建立") return
valid_count = sum(1 for res in results if res["均价"] not in ["未找到", "--", ""]) if valid_count == 0: print("⚠️ 无有效数据可插入(检查网页提取是否正常)") return
insert_sql = """ INSERT IGNORE INTO silicon_prices ( extract_time, item_name, high_price, low_price, avg_price, size_192, size_193, size_197 ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s) """
data_list = [] for res in results: def to_float(val): if val in ["未找到", "--", "", None]: return None try: return float(val) except: return None
data_list.append(( res["提取时间"], res["项目名称"], to_float(res["高点"]), to_float(res["低点"]), to_float(res["均价"]), to_float(res["192"]), to_float(res["193"]), to_float(res["197"]) ))
try: with connection.cursor() as cursor: cursor.executemany(insert_sql, data_list) connection.commit() print(f"✅ 数据插入完成,新增{cursor.rowcount}条(0表示重复)") except Exception as e: connection.rollback() print(f"❌ 插入失败:{str(e)}")
def extract_web_data(): print("\n【网页数据提取】开始...") extract_time = datetime.now().strftime("%Y-%m-%d") results = [ { "提取时间": extract_time, "项目名称": item, "高点": "未找到", "低点": "未找到", "均价": "未找到", "192": "未找到", "193": "未找到", "197": "未找到" } for item in target_items ]
try: response = requests.get(target_url, headers=headers, timeout=5) response.raise_for_status() with open("网页源码.html", "w", encoding="utf-8") as f: f.write(response.text) print("✅ 网页源码已保存(调试用)")
soup = BeautifulSoup(response.text, "html.parser") target_divs = soup.find_all("div", class_="tb-wrap tb02") if not target_divs: print("❌ 未找到目标表格(div.tb-wrap.tb02)") return results
matched_count = 0 for div in target_divs: table = div.find("table") if not table: continue for row in [r for r in table.find_all("tr") if not r.find("th")]: tds = row.find_all("td") if len(tds) < 4: continue item_name = unescape(tds[0].get_text(strip=True)) for res in results: if res["项目名称"] == item_name: res["高点"] = tds[1].get_text(strip=True) res["低点"] = tds[2].get_text(strip=True) avg_str = tds[3].get_text(strip=True) res["均价"] = avg_str matched_count += 1 print(f" ✅ 匹配到:{item_name} → 均价:{avg_str}")
if avg_str not in ["未找到", "--", ""]: try: avg = float(avg_str) res["192"] = round(avg / 183.75 * 192.6+0.03, 3) res["193"] = round(avg / 182.2 * 183 / 187.75 * 193.6+0.09, 3) res["197"] = round(avg / 182.2 * 183 / 187.75 * 197.3+0.09, 3) except: print(f" ⚠️ 计算失败:{item_name}的均价不是有效数字") break
print(f"【网页数据提取】完成,匹配到{matched_count}/{len(target_items)}个项目")
except requests.exceptions.RequestException as e: print(f"【网页数据提取】失败:{str(e)}(网络错误)") except Exception as e: print(f"【网页数据提取】失败:{str(e)}(解析错误)") """ # 保存CSV备份(无论MySQL是否成功) csv_filename = f"硅片价格汇总_{extract_time}.csv" with open(csv_filename, "w", encoding="utf-8-sig", newline="") as f: writer = csv.DictWriter(f, fieldnames=results[0].keys()) writer.writeheader() writer.writerows(results) print(f"【网页数据提取】CSV已保存:{csv_filename}") """ return results
def main(): print("="*60) print(f"程序启动:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") print("="*60)
results = extract_web_data()
print("\n" + "="*60) print("开始MySQL操作...") conn = create_db_connection() if conn: create_price_table(conn) insert_price_data(conn, results) conn.close() print("✅ MySQL连接已关闭") else: print("❌ MySQL操作终止(连接未建立)")
print("\n" + "="*60) print("程序执行完成!") print("1. 网页数据已保存到CSV(即使MySQL失败)") print("2. MySQL状态:" + ("成功" if conn else "失败,请查看错误提示")) print("="*60)
if __name__ == "__main__": main()
logging.basicConfig( filename="C:\\task_execution.log", level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s" )
logging.info("开始网页爬取...") try: response = requests.get(target_url, headers=headers, timeout=5) logging.info(f"网页请求成功,状态码:{response.status_code}") except Exception as e: logging.error(f"网页请求失败:{str(e)}")
logging.info("开始连接MySQL...") try: conn = create_db_connection() logging.info("MySQL连接成功") except Exception as e: logging.error(f"MySQL连接失败:{str(e)}")
|