WxAutoX+大模型实现微信群信息智能化提取
本文探讨了在数字化办公和社群运营中,如何利用WxAutoX工具与大模型技术实现微信群信息的智能化提取。微信群作为信息交流的重要平台,包含大量如房源、车辆、二手商品等数据,这些信息的有效提取对企业至关重要。文章详细介绍了通过Python脚本和Java后台代码,结合大模型技术,实现群消息与图片的关联分析,并提取关键信息如手机号、车辆信息等。最终,通过大模型整理后的数据,能够高效地提交到企业系统中,提升
·
在当今数字化办公和社群运营场景中,微信群已成为重要的信息交流平台。然而,海量的群消息中蕴含着宝贵的数据资源,如何高效提取、结构化这些信息一直是开发者面临的挑战。本文将介绍如何结合WxAutoX工具与大模型技术,实现微信群信息的智能化提取方案。
微信群中既有房源信息,又有车辆信息,又有二手商品信息,又有装修方案信息等等,微信群变成了我们企业信息获取的重要来源,如何结合WxAutoX+大模型实现信息的智能化分析提取呢,本文给出一个使用的解决方案。
客户端PYTHON脚本执行:实现群消息+群图片关联对应
# -*- coding: utf-8 -*-
import json
import re
import time
import concurrent.futures
import threading
import os
import requests
from wxautox import WeChat
import Test
import UploadImage
# 在全局作用域中定义这个列表
user_data_list = []
# 创建一个锁来保证线程安全的列表操作
list_lock = threading.Lock()
# 创建一个线程池执行器
executor = concurrent.futures.ThreadPoolExecutor(max_workers=10)
# 线程安全的文件写入锁
file_lock = threading.Lock()
# 备份文件名
BACKUP_FILE = 'back.txt'
# 记录文件名
RECORD_FILE = 'record.txt'
def write_to_record(content):
"""将内容写入记录文件"""
try:
with file_lock:
with open(RECORD_FILE, 'a', encoding='utf-8') as f:
f.write(content + '\n')
except Exception as e:
print(f"写入记录文件失败: {str(e)}")
def load_backup_data():
"""从备份文件加载数据"""
if os.path.exists(BACKUP_FILE):
try:
with open(BACKUP_FILE, 'r', encoding='utf-8') as f:
data = f.read().strip()
if data:
return json.loads(data)
except Exception as e:
print(f"加载备份文件失败: {str(e)}")
return []
def save_backup_data(data):
"""保存数据到备份文件"""
try:
with file_lock:
with open(BACKUP_FILE, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
except Exception as e:
print(f"保存备份文件失败: {str(e)}")
def contains_phone_number(text):
"""判断字符串中是否包含中国大陆手机号"""
pattern = r'(?<!\d)(1[3-9]\d{9})(?!\d)'
return bool(re.search(pattern, text))
def send_http_request(user_data):
"""发送HTTP请求的独立函数"""
# url = "http://localhost:8808/big/insert_user"
headers = {
"Content-Type": "application/json",
"Accept": "application/json"
}
try:
response = requests.post(
url,
data=json.dumps(user_data),
headers=headers,
timeout=3600 # 设置超时时间
)
if response.status_code == 200:
msg = "请求成功!"
else:
msg = f"请求失败,状态码: {response.status_code}"
print(msg)
write_to_record(msg)
except Exception as e:
error_msg = f"请求过程中发生错误: {str(e)}"
print(error_msg)
write_to_record(error_msg)
def process_message(who, group_nickname, content):
"""处理单条消息的函数"""
user_data = {
"who": who,
"sender": group_nickname,
"pythonInfo": content,
"time": time.time()
}
# 打印并保存原始消息
with list_lock:
# 添加到列表
if user_data["pythonInfo"] != "以下是新消息":
user_data_list.append(user_data)
print("我打印的", len(user_data_list))
write_to_record(f"当前消息数量: {len(user_data_list)}")
# 备份当前数据
save_backup_data(user_data_list)
# 检查列表长度是否达到4
# print("原始数据", user_data_list)
if len(user_data_list) >= 100:
record_content = "抓取后的微信对象信息*************************************************************************************************************"
print(record_content)
write_to_record(record_content)
for item in user_data_list:
user_data_temp_get = {
"who": item['who'],
"sender": item['sender'],
"pythonInfo": item['pythonInfo'],
"time": time.time()
}
item_str = json.dumps(user_data_temp_get, indent=4, ensure_ascii=False)
print(item_str)
write_to_record(item_str)
# 处理逻辑
finalDataList = Test.integrate_messages(user_data_list)
for item in finalDataList:
if "imgRul" in item:
if item["imgRul"] is not None:
png_list = [png.strip() for png in item["imgRul"].split(",") if png.strip().endswith(".png")]
temp_str = ""
for temp in png_list:
temp_res = UploadImage.upload_to_oss(temp.replace("\\", "/"))
temp_str = temp_str + str(temp_res['url']) + ","
item["imgRul"] = temp_str
else:
item["imgRul"] = None
# record_content = "上传到阿里云OSS后的对象信息*************************************************************************************************************"
# print(record_content)
write_to_record(record_content)
remaining_items = []
for item in finalDataList:
user_data = {
"who": item['who'],
"sender": item['sender'],
"pythonInfo": item['pythonInfo'],
"imgRul": item['imgRul'],
"time": item['time']
}
item_str = json.dumps(user_data, indent=4, ensure_ascii=False)
# print(item_str)
write_to_record(item_str)
# 检查是否满足提交条件
if (contains_phone_number(item['pythonInfo']) or contains_phone_number(item['sender'])) and \
(item['imgRul'] is not None and "https://chelaike.oss-cn-beijing.aliyuncs.com/wx_clue_pic" in
item['imgRul']):
# 满足条件,提交对象
print("图片提交成功的对象到阿里云", item_str)
executor.submit(send_http_request, user_data)
else:
current_time = time.time()
# 计算时间差(秒)
time_diff = current_time - user_data["time"]
if time_diff > 300:
print("时间差超过5分钟,发起此条信息文本请求")
if contains_phone_number(item['pythonInfo']) or contains_phone_number(item['sender']):
# 满足条件,提交对象
executor.submit(send_http_request, user_data)
else:
# print("时间差不超过5分钟")
# 不满足条件,但是是有效的车源文本数据 保留到新列表中,不是有效的文本数据就不管了 丢弃
if contains_phone_number(item['pythonInfo']) or contains_phone_number(item['sender']):
remaining_items.append(item)
# 处理完成后清空列表并更新备份文件
user_data_list.clear()
user_data_list.extend(remaining_items)
save_backup_data(user_data_list)
if __name__ == '__main__':
# 程序启动时加载备份数据
backup_data = load_backup_data()
if backup_data:
with list_lock:
user_data_list = backup_data
msg = f"从备份文件恢复了 {len(user_data_list)} 条数据"
print(msg)
write_to_record(msg)
wx = WeChat()
wait = 0.01 # 设置0.01秒查看一次是否有新消息
save_pic = True
save_video = False
save_file = False
save_voice = False
parse_url = False
try:
while True:
msgs = wx.GetNextNewMessage(save_pic, save_video, save_file, save_voice, parse_url)
if len(msgs) > 0:
for key, value in msgs.items():
who = key
for chat in msgs:
one_msgs = msgs.get(chat)
for msg in one_msgs:
content = msg.content
try:
group_nickname = msg.sender_remark
except AttributeError:
group_nickname = "默认昵称"
# 处理消息
process_message(who, group_nickname, content)
time.sleep(wait)
except KeyboardInterrupt:
print("程序被用户中断")
write_to_record("程序被用户中断")
finally:
# 关闭线程池
executor.shutdown(wait=True)
# 程序退出前保存当前数据
save_backup_data(user_data_list)
msg = "线程池已关闭,数据已备份"
print(msg)
write_to_record(msg)
后台JAVA代码
package com.black.controller;
import com.alibaba.fastjson.JSON;
import com.black.pojo.Send;
import com.black.pojo.User;
import com.black.pojo.VehicleTransaction;
import com.black.util.*;
import com.google.gson.Gson;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.FileReader;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
@RestController
@RequestMapping("/big")
public class BigController {
Gson gson = new Gson();
@PostMapping("/insert_user")
public Res insert_user(@RequestBody User user) throws Exception {
// 规整完毕的数据通过这个服务提交pythonInfo
HashMap<Object, Object> hashMap = new HashMap<>();
// System.err.println("群名称:" + user.getWho() + "。发送者:" + user.getSender() + "。\n" + "发送信息如下:\n" + user.getPythonInfo());
String question = "一、你的角色是:二手车信息分析专家。" + "\n二、你的任务是:分析输入的二手车信息,严格按照此JSON格式返回结果{\"newsOfBuyingOrReceivingACar\":\"ture/false\",\"isTheSenderTypeTheRecipientOrTheSeller\":\"xxx\",\"informationRelatedToTheTypeOfVehicleReceived\":\"xxx\",\"province\":\"xxx\",\"city\":\"xxx\",\"phoneInformation\":\"xxx\",\"vehicleBrandAndModel\":\"xxx\",\"vehiclePrice\":\"xxx\"}" + "\n三、我的要求:" + "(1)重点分析四个方面并返回中文数据。发送方类型为收车方还是卖车方、收车类型相关信息、地区信息、联系方式。" + "(2)如果发送信息中有手机号phoneInformation返回发送信息中手机号,如果没有则phoneInformation返回发送者的手机号。" + "(3)省市信息要根据发送信息规范为常规表达,如安徽省 合肥市这种。" + "(4)informationRelatedToTheTypeOfVehicleReceived要求返回发送信息的全部内容。每个车型、公里数等信息按照车型区分用\n隔开。" + "(5)vehicleBrandAndModel如果多个则取第一个即可。" + "(6)vehiclePrice如果多个则取第一个即可和上面的品牌型号对应。单位统一为万元。" + "\n四、根据以上要求,我的输入是:" + "群名称:" + user.getWho() + "。发送者:" + user.getSender() + "。\n" + "发送信息如下:\n" + user.getPythonInfo();
String douBaoRes = DouBaoModel.modelDoWork(question);
System.out.println(douBaoRes);
VehicleTransaction vehicleTransaction = gson.fromJson(douBaoRes, VehicleTransaction.class);
// 这里设置所需字段
Send send = new Send();
send.setWxGroup(user.getWho());
send.setNickName(MyUtil.removePhoneNumbers(user.getSender()));
send.setPhoneNumber(vehicleTransaction.getPhoneInformation());
send.setProvince(vehicleTransaction.getProvince());
send.setCity(vehicleTransaction.getCity());
send.setCarModel(vehicleTransaction.getVehicleBrandAndModel());
if (!vehicleTransaction.getVehiclePrice().isEmpty()) { // 非空设置值
send.setPrice(vehicleTransaction.getVehiclePrice());
} else { // 空或者空字符串设置null
send.setPrice(null);
}
send.setImgRul(user.getImgRul());
if (vehicleTransaction.getIsTheSenderTypeTheRecipientOrTheSeller().equals("卖车方")) {
send.setType(1);
} else if (vehicleTransaction.getIsTheSenderTypeTheRecipientOrTheSeller().equals("收车方")) {
send.setType(2);
}
send.setInfo(vehicleTransaction.getInformationRelatedToTheTypeOfVehicleReceived());
System.err.println("最终请求出口数据:" + JSON.toJSON(send));
if (vehicleTransaction.getNewsOfBuyingOrReceivingACar().equals("true")) {
// 则提交数据
// 创建URL对象
String urlValue = "";
try (BufferedReader reader = new BufferedReader(new FileReader("url.txt"))) {
String url;
while ((url = reader.readLine()) != null) {
urlValue = url;
}
} catch (Exception e) {
System.err.println("读取文件时出错: " + e.getMessage());
}
URL apiUrl = new URL(urlValue);
// 打开连接
HttpURLConnection connection = (HttpURLConnection) apiUrl.openConnection();
// 设置请求方法为POST
connection.setRequestMethod("POST");
// 设置请求头
connection.setRequestProperty("Content-Type", "application/json; utf-8");
connection.setRequestProperty("Accept", "application/json");
// 允许写入请求体
connection.setDoOutput(true);
// 写入请求体
try (DataOutputStream wr = new DataOutputStream(connection.getOutputStream())) {
byte[] input = JSON.toJSON(send).toString().getBytes(StandardCharsets.UTF_8);
wr.write(input, 0, input.length);
}
// 获取响应码
int responseCode = connection.getResponseCode();
System.out.println("Response Code: " + responseCode);
// 读取响应内容
try (BufferedReader br = new BufferedReader(new InputStreamReader(connection.getInputStream(), StandardCharsets.UTF_8))) {
StringBuilder response = new StringBuilder();
String responseLine;
while ((responseLine = br.readLine()) != null) {
response.append(responseLine.trim());
}
// System.out.println("Response: " + response.toString());
}
// 关闭连接
connection.disconnect();
} else {
System.out.println("非法的数据,不用关心");
hashMap.put("douBaoRes", "非法的数据,不用关心");
return Res.success(hashMap);
}
hashMap.put("douBaoRes", send);
return Res.success(hashMap);
}
}
实现效果
{
"who": "XXX",
"sender": "XXX",
"pythonInfo": "【一口价】5.6万💰\n【车辆款型】大众\n【上牌日期】2022-6\n【车辆配置】大众朗逸1.5自动风尚版\n【车辆排量】1.5LT\n【真实公里】6-10万公里\n【车辆颜色】白\n【详细车况】新到一批租赁户,一手过公司待销售\n22年6月份上牌大众朗逸1.5自动风尚版,\n公里6万到10万公里不等\n批发价5.7万贵州场地\n【过户次数】1\n【车所在地】贵州\n【联系电话】XXX",
"imgRul": "https://chelaike.oss-cn-beijing.aliyuncs.com/wx_clue_pic/20250521160808_1d68dbd8.png,",
"time": 1747813316.0353506
}
大模型整理后数据
{
"phoneNumber": "18205072893",
"province": "江苏省",
"city": "扬州市",
"nickName": "批发二手车张 ",
"price": "6X.X万元",
"wxGroup": "及时雨名车内部交流群中南",
"imgRul": "https://chelaike.oss-cn-beijing.aliyuncs.com/wx_clue_pic/20250521210932_5a3666b8.png,",
"pwd": "xxx",
"type": 1,
"carModel": "宾利飞驰",
"info": "【车辆款型】宾利飞驰\n【上牌日期】2014.08\n【出厂日期】2013\n【指 导 价】405.8万\n【汽车排量】6.0T\n【排放标准】国5\n【车辆里程】10万\n【车辆颜色】深蓝外米内\n【车辆配置】4.0T V8标准版\n【详细车况】原版车况\n【过户次数】5次"
}
更多推荐
所有评论(0)