【python读取微信消息并解析获取夸克网盘链接,并转存到自己网盘生成分享链接转发到指定人(或群)
python读取微信消息并解析获取夸克网盘链接,并转存到自己网盘生成分享链接转发到指定人(或群)
·
python读取微信消息并解析获取夸克网盘链接,并转存到自己网盘生成分享链接转发到指定人(或群)
- 下面是具体代码实现
# 导入
import re
import requests
import json
import urllib.parse
import time
import random
from wxauto import WeChat
# Pyinstaller -F 微信自动化2.py
# 如有更多定制需求 +V:duanduan981201 备注python定制
# 设置监听列表 用户或群列表
listen_list = []
headers = {
'cookie': '', # 授权令牌(例如:Bearer Token)
'Accept': 'application/json' # 指定客户端期望接受 JSON 格式响应
}
cookie = ''; # 夸克token
def get_file_id(pwd_id, stoken):
# 对 stoken 进行 URL 编码
encoded_stoken = urllib.parse.quote(stoken)
# 构造 URL
url = f"https://drive-h.quark.cn/1/clouddrive/share/sharepage/detail?pr=ucpro&fr=pc&uc_param_str=&pwd_id={pwd_id}&stoken={encoded_stoken}&pdir_fid=0&force=0&_page=1&_size=99999&_fetch_banner=1&_fetch_share=1&_fetch_total=1&_sort=file_type:asc,updated_at:desc&__dt=1032&__t={int(time.time() * 1000)}"
try:
# 发送请求
response = requests.get(url, headers=headers)
response.raise_for_status() # 如果响应码不是 200,会抛出异常
# 解析响应体中的 JSON
result = response.json()
# 提取 "list" 数组
data = result.get("data", {})
file_list = data.get("list", [])
fid_list = []
fid_token_list = []
# 提取 fid 和 share_fid_token
for item in file_list:
fid = item.get("fid")
share_fid_token = item.get("share_fid_token")
if not fid or not share_fid_token:
raise ValueError("Failed to retrieve 'fid' or 'share_fid_token' from response")
fid_list.append(fid)
fid_token_list.append(share_fid_token)
return fid_list, fid_token_list
except requests.exceptions.RequestException as e:
# 网络请求相关的异常处理
print(f"Request failed: {e}")
return None, None
except ValueError as e:
# 解析数据相关的异常处理
print(f"Error parsing the response data: {e}")
return None, None
def random_pause():
# 设置随机暂停时间 (2-5秒之间)
pause_duration = random.randint(2, 5)
print(f"暂停 {pause_duration} 秒...")
time.sleep(pause_duration)
def save_file(pwd_id, stoken, to_pdir_fid, fid_list, fid_token_list):
req_body = {
"fid_list": fid_list,
"fid_token_list": fid_token_list,
"stoken": stoken,
"pwd_id": pwd_id,
"to_pdir_fid": to_pdir_fid,
}
# 构造 URL
url = f"https://drive-pc.quark.cn/1/clouddrive/share/sharepage/save?pr=ucpro&fr=pc&uc_param_str=&__dt=779577&__t={int(time.time() * 1000)}"
try:
# 发送请求
response = requests.post(url, json=req_body, headers=headers)
response.raise_for_status() # 如果响应码不是 200,会抛出异常
data1 = response.json()
# 提取 data 部分
data2 = data1.get("data", {})
if not data2:
return None, "data 格式转换错误"
a = 0
random_pause()
while True:
task_id = data2.get("task_id")
if not task_id:
return None, "task_id 缺失"
# 获取任务状态
url = f"https://drive-pc.quark.cn/1/clouddrive/task?pr=ucpro&fr=pc&uc_param_str=&task_id={task_id}&retry_index=1&__dt=9565&__t={int(time.time() * 1000)}"
response = requests.get(url, headers=headers)
response.raise_for_status()
data3 = response.json()
code = data3.get("code")
message = data3.get("message")
if code == 41035 and message == "单次转存文件个数超出用户等级限制":
get_file_id999(pwd_id, stoken, 0)
fxList = recursive_save(pwd_id, stoken,to_pdir_fid,0)
if len(fxList) > 0 :
return fxList, None
return None, f"{message} -- 保存失败"
save_as = data3.get("data", {}).get("save_as")
if not save_as:
if a > 10:
break
random_pause()
print("save_as 格式转换错误")
a += 1
continue
save_as_top_fids_raw = save_as.get("save_as_top_fids")
if not save_as_top_fids_raw:
if a > 10:
break
random_pause()
print("save_as_top_fids 不存在或类型不正确")
a += 1
continue
# 转换为 list of strings
save_as_top_fids = [str(f) for f in save_as_top_fids_raw]
if save_as_top_fids:
print("获取 saveAsTopFids 成功..")
return save_as_top_fids, None
if a > 10:
break
random_pause()
print("saveAsTopFids 为空")
a += 1
return None, "获取保存后的文件失败"
except requests.exceptions.RequestException as e:
# 网络请求相关的异常处理
print(f"Request failed: {e}")
return None, str(e)
# 遍历存储文件
def recursive_save(pwd_id, stoken,to_pdir_fid,pdir_fid):
fxList = []
result = get_file_id999(pwd_id, stoken,pdir_fid)
# print(result)
if result.get("code") == 0:
list = result.get("data").get("list")
print(list)
folderList = [] # 文件夹
fileList = [] # 文件
for ls in list:
print(ls["dir"])
if ls["dir"] is False:
fileList.append(ls)
else:
folderList.append(ls)
if len(fileList) > 0 :
list = split_array_max_40(fileList)
fid_list = []
fid_token_list = []
for item in list:
for im in item :
print(im)
fid = im.get("fid")
share_fid_token = im.get("share_fid_token")
if not fid or not share_fid_token:
raise ValueError("Failed to retrieve 'fid' or 'share_fid_token' from response")
fid_list.append(im.get("fid"))
fid_token_list.append(im.get("share_fid_token"))
save_as_top_fids, error = save_file99(pwd_id,stoken,to_pdir_fid,fid_list,fid_token_list)
if pdir_fid == 0 :
if save_as_top_fids:
fxList = fxList + save_as_top_fids
else:
print("Error:", error)
for fol in folderList :
fid = fol.get("fid")
fileName = fol.get("file_name")
result = kuake_new_folder(fileName, to_pdir_fid)
if result:
print("Response:", result)
if result.get("code") == 0:
toPdirFid = result.get("data").get("fid")
print("创建文件夹成功 fid = " + toPdirFid)
if pdir_fid == 0:
fxList.append(toPdirFid)
# stoken = extracted(pwd_id)
recursive_save(pwd_id, stoken, toPdirFid, fid)
else:
print("创建文件夹失败")
if pdir_fid == 0 :
return fxList
def getListByPdirFid(pdirFid):
url = f"https://drive-pc.quark.cn/1/clouddrive/file/sort?pr=ucpro&fr=pc&uc_param_str=&pdir_fid={pdirFid}&_page=1&_size=2000&_fetch_total=1&_fetch_sub_dirs=0&_sort=file_type:asc,updated_at:desc"
try:
# 发送请求
response = requests.get(url, headers=headers)
response.raise_for_status() # 如果响应码不是 200,会抛出异常
# 解析响应体中的 JSON
result = response.json()
return result
except requests.exceptions.RequestException as e:
# 网络请求相关的异常处理
print(f"Request failed: {e}")
return None
except ValueError as e:
# 解析数据相关的异常处理
print(f"Error parsing the response data: {e}")
return None
def get_file_id999(pwd_id, stoken,pdir_fid):
# 对 stoken 进行 URL 编码
encoded_stoken = urllib.parse.quote(stoken)
# 构造 URL
url = f"https://drive-h.quark.cn/1/clouddrive/share/sharepage/detail?pr=ucpro&fr=pc&uc_param_str=&pwd_id={pwd_id}&stoken={encoded_stoken}&pdir_fid={pdir_fid}&force=0&_page=1&_size=50&_fetch_banner=1&_fetch_share=1&_fetch_total=1&_sort=file_type:asc,updated_at:desc&__dt=1032&__t={int(time.time() * 1000)}"
try:
# 发送请求
response = requests.get(url, headers=headers)
response.raise_for_status() # 如果响应码不是 200,会抛出异常
# 解析响应体中的 JSON
result = response.json()
return result
except requests.exceptions.RequestException as e:
# 网络请求相关的异常处理
print(f"Request failed: {e}")
return None
except ValueError as e:
# 解析数据相关的异常处理
print(f"Error parsing the response data: {e}")
return None
def split_array_max_40(arr):
"""
将数组 arr 分割成每个子数组的长度不超过 40。
:param arr: 输入的数组
:return: 分割后的数组列表
"""
result = []
max_size = 40
for i in range(0, len(arr), max_size):
result.append(arr[i:i + max_size])
return result
def save_file99(pwd_id, stoken, to_pdir_fid, fid_list, fid_token_list):
req_body = {
"fid_list": fid_list,
"fid_token_list": fid_token_list,
"stoken": stoken,
"pwd_id": pwd_id,
"to_pdir_fid": to_pdir_fid,
}
# 构造 URL
url = f"https://drive-pc.quark.cn/1/clouddrive/share/sharepage/save?pr=ucpro&fr=pc&uc_param_str=&__dt=779577&__t={int(time.time() * 1000)}"
try:
# 发送请求
response = requests.post(url, json=req_body, headers=headers)
response.raise_for_status() # 如果响应码不是 200,会抛出异常
data1 = response.json()
# 提取 data 部分
data2 = data1.get("data", {})
if not data2:
return None, "data 格式转换错误"
a = 0
random_pause()
while True:
task_id = data2.get("task_id")
if not task_id:
return None, "task_id 缺失"
# 获取任务状态
url = f"https://drive-pc.quark.cn/1/clouddrive/task?pr=ucpro&fr=pc&uc_param_str=&task_id={task_id}&retry_index=1&__dt=9565&__t={int(time.time() * 1000)}"
response = requests.get(url, headers=headers)
response.raise_for_status()
data3 = response.json()
code = data3.get("code")
message = data3.get("message")
if code == 41035 and message == "单次转存文件个数超出用户等级限制":
return None, f"{message} -- 保存失败"
save_as = data3.get("data", {}).get("save_as")
if not save_as:
if a > 10:
break
random_pause()
print("save_as 格式转换错误")
a += 1
continue
save_as_top_fids_raw = save_as.get("save_as_top_fids")
if not save_as_top_fids_raw:
if a > 10:
break
random_pause()
print("save_as_top_fids 不存在或类型不正确")
a += 1
continue
# 转换为 list of strings
save_as_top_fids = [str(f) for f in save_as_top_fids_raw]
if save_as_top_fids:
print("获取 saveAsTopFids 成功..")
return save_as_top_fids, None
if a > 10:
break
random_pause()
print("saveAsTopFids 为空")
a += 1
return None, "获取保存后的文件失败"
except requests.exceptions.RequestException as e:
# 网络请求相关的异常处理
print(f"Request failed: {e}")
return None, str(e)
def fenxiang(fid_list):
send_body = {
"fid_list": fid_list,
"title": "",
"url_type": 1,
"expired_type": 1,
}
share_id = ""
a = 0
while True:
json_body = json.dumps(send_body)
url = "https://drive-pc.quark.cn/1/clouddrive/share?pr=ucpro&fr=pc&uc_param_str="
try:
response = requests.post(url, data=json_body, headers=headers)
response.raise_for_status() # 如果响应码不是 200,会抛出异常
data1 = response.json()
task_resp = data1.get("data", {}).get("task_resp", {})
if task_resp:
share_id = task_resp.get("data", {}).get("share_id", "")
if share_id:
break
else:
print("获取 share_id 失败,等待重新获取...")
random_pause()
continue
else:
random_pause()
task_id = data1.get("data", {}).get("task_id", "")
if not task_id:
print("获取 task_id 失败,等待重新获取...")
random_pause()
continue
task_url = f"https://drive-pc.quark.cn/1/clouddrive/task?pr=ucpro&fr=pc&uc_param_str=&retry_index=1&task_id={task_id}"
response = requests.get(task_url, headers=headers)
response.raise_for_status()
data1 = response.json()
share_id = data1["data"].get("share_id", "")
except Exception as e: # 捕获所有异常并处理
print(f"请求过程中出错: {e}")
if a > 0:
return "", f"获取连接失败 share_id"
a += 1
random_pause()
continue
if not share_id:
if a > 0:
return "", f"获取连接失败 share_id"
a += 1
random_pause()
continue
break
a = 0
print("请求获取 shareId 成功,等待获取分享链接...")
while True:
url = "https://drive-pc.quark.cn/1/clouddrive/share/password?pr=ucpro&fr=pc&uc_param_str="
req_body = {
"share_id": share_id,
}
json_body = json.dumps(req_body)
try:
response = requests.post(url, data=json_body, headers=headers)
response.raise_for_status() # 如果响应码不是 200,会抛出异常
data1 = response.json()
status = data1.get("status", None)
if status != 200:
print(f"获取连接状态码不等于200, status: {status}")
if a > 10:
return "", "获取分享链接失败..."
a += 1
random_pause()
continue
share_url = data1["data"].get("share_url", "")
if not share_url:
print("获取转换的链接失败")
return "", None
print("获取分享的链接成功...")
return share_url, None
except requests.exceptions.RequestException as e:
if a > 10:
return "", "发送请求获取链接失败..."
print("发送请求获取链接失败...")
a += 1
random_pause()
continue
return "", None
# 创建新文件夹的函数
def kuake_new_folder(file_name, pdir_fid):
random_pause()
url = "https://drive-pc.quark.cn/1/clouddrive/file?pr=ucpro&fr=pc&uc_param_str="
# 自定义请求体参数
data = {
"pdir_fid": pdir_fid,
"file_name": file_name,
"dir_path": "",
"dir_init_lock": False,
}
# 将请求体序列化为 JSON 格式
json_data = json.dumps(data)
# 请求头
headers = {
"authority": "drive-pc.quark.cn",
"accept": "application/json, text/plain, */*",
"accept-language": "zh-CN,zh;q=0.9",
"content-type": "application/json",
"cookie": cookie, # 请替换为有效的 cookie
"origin": "https://pan.quark.cn",
"referer": "https://pan.quark.cn/list",
"sec-fetch-dest": "empty",
"sec-fetch-mode": "cors",
"sec-fetch-site": "same-site",
"user-agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.5414.121 Safari/537.36"
}
# 发送 POST 请求
try:
response = requests.post(url, json=data, headers=headers)
response.raise_for_status() # 如果状态码不是 200,会抛出异常
# 读取响应内容
response_data = response.json()
# 返回响应数据
return response_data
except requests.exceptions.RequestException as e:
print(f"请求失败: {e}")
return None
except json.JSONDecodeError as e:
print(f"JSON 解析失败: {e}")
return None
def get_new_url(pwd_id):
print(pwd_id)
stoken = extracted(pwd_id)
print(stoken)
fid_list, fid_token_list = get_file_id(pwd_id,stoken)
to_pdir_fid = user_input #"390acd042c3d4ea7abb31ba830f64b64"
if fid_list is not None and fid_token_list is not None:
print("FID List:", fid_list)
print("FID Token List:", fid_token_list)
a = 0
while True :
save_as_top_fids, error = save_file(pwd_id, stoken, to_pdir_fid, fid_list, fid_token_list)
if save_as_top_fids:
print("SaveAsTopFids:", save_as_top_fids)
share_url, error = fenxiang(save_as_top_fids)
if share_url:
print("Share URL:", share_url)
return share_url
else:
print("Error:", error)
return "分享失败...."
else:
print("Error:", error)
if ( a > 10 ) :
return "分享失败...."
# """
# 模拟方法:传入提取的 URL 标识符,返回一个新的 URL。
# 这里的 pwd_id 就是从原 URL 中提取出来的标识符。
# """
# # 假设返回一个新 URL,实际可以根据业务逻辑进行处理
# return f"https://pan.quark.cn/s/{pwd_id}_new" # 假设新 URL 是在原标识符基础上加上 "_new"
def replace_urls_and_extract(text):
# 正则表达式:匹配 https://pan.quark.cn/s/ 后面的一段数据
url_pattern = r"https://pan\.quark\.cn/s/([a-zA-Z0-9]+)"
# 提取所有匹配的 URL 的唯一标识符
original_urls = re.findall(url_pattern, text)
# 打印出提取出来的原始 URL 标识符
print("提取出的原始 URL 标识符:")
for url in original_urls:
print(url)
# 使用正则表达式替换所有 URL
def replace_url(match):
# 获取原始 URL 标识符
pwd_id = match.group(1)
# 获取新 URL
new_url = get_new_url(pwd_id)
return new_url
# 替换文本中的所有 URL
updated_text = re.sub(url_pattern, replace_url, text)
# 打印出替换后的文本
print("\n替换后的文本:")
print(updated_text)
return updated_text
def extracted(pwd_id):
# 构造 URL
url = f"https://drive-h.quark.cn/1/clouddrive/share/sharepage/token?pr=ucpro&fr=pc&uc_param_str=&__dt=709&__t={int(time.time() * 1000)}"
# 构建请求体
body1 = {
"pwd_id": pwd_id,
"passcode": ""
}
try:
# 发送 POST 请求
response = requests.post(url, json=body1, headers=headers)
# 检查请求是否成功
response.raise_for_status() # 如果响应码是 4xx 或 5xx,抛出异常
# 解析响应 JSON
data = response.json()
# 提取 "stoken"
data_content = data.get("data", {})
stoken = data_content.get("stoken")
if not stoken:
raise ValueError("Invalid response structure: 'stoken' not found")
return stoken
except requests.exceptions.RequestException as e:
# 请求错误处理
print(f"Request failed: {e}")
return "", e
except ValueError as e:
# 解析错误处理
print(f"Error parsing response: {e}")
return "", e
def read_file_content(file_path):
while True:
try:
# 打开文件并读取内容
with open(file_path, 'r', encoding='utf-8') as file:
content = file.read()
# 判断文件是否为空
if not content.strip(): # 如果文件内容为空(去掉空白字符后为空)
print("文件内容为空,请检查文件内容。")
input("按回车键重新读取文件...")
else:
return content # 返回文件内容
except FileNotFoundError:
print(f"文件 {file_path} 未找到,请检查文件路径。")
break
except Exception as e:
print(f"发生错误: {e}")
break
updated_text = ""
while True:
try:
# 获取微信窗口对象
wx = WeChat()
# 循环添加监听对象
for i in listen_list:
wx.AddListenChat(who=i, savepic=False)
print("监听列表成功。。。")
target_link = "https://pan.quark.cn"
wait = 1 # 设置1秒查看一次是否有新消息
# 持续监听消息,并且收到消息后回复“收到”
while True:
try:
msgs = wx.GetListenMessage()
if updated_text:
wx.SendMsg(updated_text, '简一')
updated_text = ""
for chat in msgs:
who = chat.who # 获取聊天窗口名(人或群名)
one_msgs = msgs.get(chat) # 获取消息内容
# 回复收到
for msg in one_msgs:
msgtype = msg.type # 获取消息类型
content = msg.content # 获取消息内容,字符串类型的消息内容
print(f'【{who}】:{content}')
if target_link in content:
user_input = read_file_content("id.txt")
print("读取到的内容为:",user_input)
updated_text = replace_urls_and_extract(content)
with open('result.txt', 'a', encoding='utf-8') as file:
file.write(updated_text + "\n\n")
print("保存链接成功")
wx.SendMsg(updated_text, '简一')#发送给指定好友
updated_text = ""
else:
print("文本中未包含指定链接字符串")
time.sleep(wait) # 控制每秒查看一次消息
except Exception as e:
print(f"消息监听部分发生错误: {e}")
time.sleep(wait) # 如果出现错误,休眠一段时间后继续
except Exception as e:
print(f"获取微信窗口或初始化监听对象时发生错误: {e}")
print("重新初始化微信对象并继续执行...")
time.sleep(5) # 等待一段时间后重新执行
更多推荐
所有评论(0)