在当今数字化办公和社群运营场景中,微信群已成为重要的信息交流平台。然而,海量的群消息中蕴含着宝贵的数据资源,如何高效提取、结构化这些信息一直是开发者面临的挑战。本文将介绍如何结合WxAutoX工具与大模型技术,实现微信群信息的智能化提取方案。

      微信群中既有房源信息,又有车辆信息,又有二手商品信息,又有装修方案信息等等,微信群变成了我们企业信息获取的重要来源,如何结合WxAutoX+大模型实现信息的智能化分析提取呢,本文给出一个使用的解决方案。

客户端PYTHON脚本执行:实现群消息+群图片关联对应

# -*- coding: utf-8 -*-
import json
import re
import time
import concurrent.futures
import threading
import os

import requests
from wxautox import WeChat

import Test
import UploadImage

# 在全局作用域中定义这个列表
user_data_list = []
# 创建一个锁来保证线程安全的列表操作
list_lock = threading.Lock()

# 创建一个线程池执行器
executor = concurrent.futures.ThreadPoolExecutor(max_workers=10)
# 线程安全的文件写入锁
file_lock = threading.Lock()

# 备份文件名
BACKUP_FILE = 'back.txt'
# 记录文件名
RECORD_FILE = 'record.txt'


def write_to_record(content):
    """将内容写入记录文件"""
    try:
        with file_lock:
            with open(RECORD_FILE, 'a', encoding='utf-8') as f:
                f.write(content + '\n')
    except Exception as e:
        print(f"写入记录文件失败: {str(e)}")


def load_backup_data():
    """从备份文件加载数据"""
    if os.path.exists(BACKUP_FILE):
        try:
            with open(BACKUP_FILE, 'r', encoding='utf-8') as f:
                data = f.read().strip()
                if data:
                    return json.loads(data)
        except Exception as e:
            print(f"加载备份文件失败: {str(e)}")
    return []


def save_backup_data(data):
    """保存数据到备份文件"""
    try:
        with file_lock:
            with open(BACKUP_FILE, 'w', encoding='utf-8') as f:
                json.dump(data, f, ensure_ascii=False, indent=2)
    except Exception as e:
        print(f"保存备份文件失败: {str(e)}")


def contains_phone_number(text):
    """判断字符串中是否包含中国大陆手机号"""
    pattern = r'(?<!\d)(1[3-9]\d{9})(?!\d)'
    return bool(re.search(pattern, text))


def send_http_request(user_data):
    """发送HTTP请求的独立函数"""
    # url = "http://localhost:8808/big/insert_user"
    headers = {
        "Content-Type": "application/json",
        "Accept": "application/json"
    }

    try:
        response = requests.post(
            url,
            data=json.dumps(user_data),
            headers=headers,
            timeout=3600  # 设置超时时间
        )

        if response.status_code == 200:
            msg = "请求成功!"
        else:
            msg = f"请求失败,状态码: {response.status_code}"

        print(msg)
        write_to_record(msg)

    except Exception as e:
        error_msg = f"请求过程中发生错误: {str(e)}"
        print(error_msg)
        write_to_record(error_msg)


def process_message(who, group_nickname, content):
    """处理单条消息的函数"""
    user_data = {
        "who": who,
        "sender": group_nickname,
        "pythonInfo": content,
        "time": time.time()
    }

    # 打印并保存原始消息
    with list_lock:
        # 添加到列表
        if user_data["pythonInfo"] != "以下是新消息":
            user_data_list.append(user_data)
            print("我打印的", len(user_data_list))
            write_to_record(f"当前消息数量: {len(user_data_list)}")
            # 备份当前数据
            save_backup_data(user_data_list)
        # 检查列表长度是否达到4
        # print("原始数据", user_data_list)
        if len(user_data_list) >= 100:
            record_content = "抓取后的微信对象信息*************************************************************************************************************"
            print(record_content)
            write_to_record(record_content)

            for item in user_data_list:
                user_data_temp_get = {
                    "who": item['who'],
                    "sender": item['sender'],
                    "pythonInfo": item['pythonInfo'],
                    "time": time.time()
                }
                item_str = json.dumps(user_data_temp_get, indent=4, ensure_ascii=False)
                print(item_str)
                write_to_record(item_str)

            # 处理逻辑
            finalDataList = Test.integrate_messages(user_data_list)

            for item in finalDataList:
                if "imgRul" in item:
                    if item["imgRul"] is not None:
                        png_list = [png.strip() for png in item["imgRul"].split(",") if png.strip().endswith(".png")]
                        temp_str = ""
                        for temp in png_list:
                            temp_res = UploadImage.upload_to_oss(temp.replace("\\", "/"))
                            temp_str = temp_str + str(temp_res['url']) + ","
                        item["imgRul"] = temp_str
                else:
                    item["imgRul"] = None

            # record_content = "上传到阿里云OSS后的对象信息*************************************************************************************************************"
            # print(record_content)
            write_to_record(record_content)

            remaining_items = []
            for item in finalDataList:
                user_data = {
                    "who": item['who'],
                    "sender": item['sender'],
                    "pythonInfo": item['pythonInfo'],
                    "imgRul": item['imgRul'],
                    "time": item['time']
                }
                item_str = json.dumps(user_data, indent=4, ensure_ascii=False)
                # print(item_str)
                write_to_record(item_str)

                # 检查是否满足提交条件
                if (contains_phone_number(item['pythonInfo']) or contains_phone_number(item['sender'])) and \
                        (item['imgRul'] is not None and "https://chelaike.oss-cn-beijing.aliyuncs.com/wx_clue_pic" in
                         item['imgRul']):
                    # 满足条件,提交对象
                    print("图片提交成功的对象到阿里云", item_str)
                    executor.submit(send_http_request, user_data)
                else:
                    current_time = time.time()
                    # 计算时间差(秒)
                    time_diff = current_time - user_data["time"]
                    if time_diff > 300:
                        print("时间差超过5分钟,发起此条信息文本请求")
                        if contains_phone_number(item['pythonInfo']) or contains_phone_number(item['sender']):
                            # 满足条件,提交对象
                            executor.submit(send_http_request, user_data)
                    else:
                        # print("时间差不超过5分钟")
                        # 不满足条件,但是是有效的车源文本数据    保留到新列表中,不是有效的文本数据就不管了  丢弃
                        if contains_phone_number(item['pythonInfo']) or contains_phone_number(item['sender']):
                            remaining_items.append(item)
            # 处理完成后清空列表并更新备份文件
            user_data_list.clear()
            user_data_list.extend(remaining_items)
            save_backup_data(user_data_list)


if __name__ == '__main__':
    # 程序启动时加载备份数据
    backup_data = load_backup_data()
    if backup_data:
        with list_lock:
            user_data_list = backup_data
            msg = f"从备份文件恢复了 {len(user_data_list)} 条数据"
            print(msg)
            write_to_record(msg)
    wx = WeChat()
    wait = 0.01  # 设置0.01秒查看一次是否有新消息
    save_pic = True
    save_video = False
    save_file = False
    save_voice = False
    parse_url = False

    try:
        while True:
            msgs = wx.GetNextNewMessage(save_pic, save_video, save_file, save_voice, parse_url)
            if len(msgs) > 0:
                for key, value in msgs.items():
                    who = key
                for chat in msgs:
                    one_msgs = msgs.get(chat)
                    for msg in one_msgs:
                        content = msg.content
                        try:
                            group_nickname = msg.sender_remark
                        except AttributeError:
                            group_nickname = "默认昵称"
                        # 处理消息
                        process_message(who, group_nickname, content)
            time.sleep(wait)

    except KeyboardInterrupt:
        print("程序被用户中断")
        write_to_record("程序被用户中断")
    finally:
        # 关闭线程池
        executor.shutdown(wait=True)
        # 程序退出前保存当前数据
        save_backup_data(user_data_list)
        msg = "线程池已关闭,数据已备份"
        print(msg)
        write_to_record(msg)

后台JAVA代码

package com.black.controller;

import com.alibaba.fastjson.JSON;
import com.black.pojo.Send;
import com.black.pojo.User;
import com.black.pojo.VehicleTransaction;
import com.black.util.*;
import com.google.gson.Gson;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.FileReader;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

@RestController
@RequestMapping("/big")
public class BigController {
    Gson gson = new Gson();

    @PostMapping("/insert_user")
    public Res insert_user(@RequestBody User user) throws Exception {
        // 规整完毕的数据通过这个服务提交pythonInfo
        HashMap<Object, Object> hashMap = new HashMap<>();
        // System.err.println("群名称:" + user.getWho() + "。发送者:" + user.getSender() + "。\n" + "发送信息如下:\n" + user.getPythonInfo());
        String question = "一、你的角色是:二手车信息分析专家。" + "\n二、你的任务是:分析输入的二手车信息,严格按照此JSON格式返回结果{\"newsOfBuyingOrReceivingACar\":\"ture/false\",\"isTheSenderTypeTheRecipientOrTheSeller\":\"xxx\",\"informationRelatedToTheTypeOfVehicleReceived\":\"xxx\",\"province\":\"xxx\",\"city\":\"xxx\",\"phoneInformation\":\"xxx\",\"vehicleBrandAndModel\":\"xxx\",\"vehiclePrice\":\"xxx\"}" + "\n三、我的要求:" + "(1)重点分析四个方面并返回中文数据。发送方类型为收车方还是卖车方、收车类型相关信息、地区信息、联系方式。" + "(2)如果发送信息中有手机号phoneInformation返回发送信息中手机号,如果没有则phoneInformation返回发送者的手机号。" + "(3)省市信息要根据发送信息规范为常规表达,如安徽省 合肥市这种。" + "(4)informationRelatedToTheTypeOfVehicleReceived要求返回发送信息的全部内容。每个车型、公里数等信息按照车型区分用\n隔开。" + "(5)vehicleBrandAndModel如果多个则取第一个即可。" + "(6)vehiclePrice如果多个则取第一个即可和上面的品牌型号对应。单位统一为万元。" + "\n四、根据以上要求,我的输入是:" + "群名称:" + user.getWho() + "。发送者:" + user.getSender() + "。\n" + "发送信息如下:\n" + user.getPythonInfo();
        String douBaoRes = DouBaoModel.modelDoWork(question);
        System.out.println(douBaoRes);
        VehicleTransaction vehicleTransaction = gson.fromJson(douBaoRes, VehicleTransaction.class);
        // 这里设置所需字段
        Send send = new Send();
        send.setWxGroup(user.getWho());
        send.setNickName(MyUtil.removePhoneNumbers(user.getSender()));
        send.setPhoneNumber(vehicleTransaction.getPhoneInformation());
        send.setProvince(vehicleTransaction.getProvince());
        send.setCity(vehicleTransaction.getCity());
        send.setCarModel(vehicleTransaction.getVehicleBrandAndModel());
        if (!vehicleTransaction.getVehiclePrice().isEmpty()) { // 非空设置值
            send.setPrice(vehicleTransaction.getVehiclePrice());
        } else { // 空或者空字符串设置null
            send.setPrice(null);
        }

        send.setImgRul(user.getImgRul());

        if (vehicleTransaction.getIsTheSenderTypeTheRecipientOrTheSeller().equals("卖车方")) {
            send.setType(1);
        } else if (vehicleTransaction.getIsTheSenderTypeTheRecipientOrTheSeller().equals("收车方")) {
            send.setType(2);
        }
        send.setInfo(vehicleTransaction.getInformationRelatedToTheTypeOfVehicleReceived());
        System.err.println("最终请求出口数据:" + JSON.toJSON(send));
        if (vehicleTransaction.getNewsOfBuyingOrReceivingACar().equals("true")) {
            // 则提交数据
            // 创建URL对象
            String urlValue = "";
            try (BufferedReader reader = new BufferedReader(new FileReader("url.txt"))) {
                String url;
                while ((url = reader.readLine()) != null) {
                    urlValue = url;
                }
            } catch (Exception e) {
                System.err.println("读取文件时出错: " + e.getMessage());
            }
            URL apiUrl = new URL(urlValue);
            // 打开连接
            HttpURLConnection connection = (HttpURLConnection) apiUrl.openConnection();
            // 设置请求方法为POST
            connection.setRequestMethod("POST");
            // 设置请求头
            connection.setRequestProperty("Content-Type", "application/json; utf-8");
            connection.setRequestProperty("Accept", "application/json");
            // 允许写入请求体
            connection.setDoOutput(true);
            // 写入请求体
            try (DataOutputStream wr = new DataOutputStream(connection.getOutputStream())) {
                byte[] input = JSON.toJSON(send).toString().getBytes(StandardCharsets.UTF_8);
                wr.write(input, 0, input.length);
            }
            // 获取响应码
            int responseCode = connection.getResponseCode();
            System.out.println("Response Code: " + responseCode);
            // 读取响应内容
            try (BufferedReader br = new BufferedReader(new InputStreamReader(connection.getInputStream(), StandardCharsets.UTF_8))) {
                StringBuilder response = new StringBuilder();
                String responseLine;
                while ((responseLine = br.readLine()) != null) {
                    response.append(responseLine.trim());
                }
                // System.out.println("Response: " + response.toString());
            }

            // 关闭连接
            connection.disconnect();
        } else {
            System.out.println("非法的数据,不用关心");
            hashMap.put("douBaoRes", "非法的数据,不用关心");
            return Res.success(hashMap);
        }
        hashMap.put("douBaoRes", send);
        return Res.success(hashMap);
    }
}

实现效果

{
    "who": "XXX",
    "sender": "XXX",
    "pythonInfo": "【一口价】5.6万💰\n【车辆款型】大众\n【上牌日期】2022-6\n【车辆配置】大众朗逸1.5自动风尚版\n【车辆排量】1.5LT\n【真实公里】6-10万公里\n【车辆颜色】白\n【详细车况】新到一批租赁户,一手过公司待销售\n22年6月份上牌大众朗逸1.5自动风尚版,\n公里6万到10万公里不等\n批发价5.7万贵州场地\n【过户次数】1\n【车所在地】贵州\n【联系电话】XXX",
    "imgRul": "https://chelaike.oss-cn-beijing.aliyuncs.com/wx_clue_pic/20250521160808_1d68dbd8.png,",
    "time": 1747813316.0353506
}

 大模型整理后数据

{
  "phoneNumber": "18205072893",
  "province": "江苏省",
  "city": "扬州市",
  "nickName": "批发二手车张 ",
  "price": "6X.X万元",
  "wxGroup": "及时雨名车内部交流群中南",
  "imgRul": "https://chelaike.oss-cn-beijing.aliyuncs.com/wx_clue_pic/20250521210932_5a3666b8.png,",
  "pwd": "xxx",
  "type": 1,
  "carModel": "宾利飞驰",
  "info": "【车辆款型】宾利飞驰\n【上牌日期】2014.08\n【出厂日期】2013\n【指  导  价】405.8万\n【汽车排量】6.0T\n【排放标准】国5\n【车辆里程】10万\n【车辆颜色】深蓝外米内\n【车辆配置】4.0T V8标准版\n【详细车况】原版车况\n【过户次数】5次"
}

Logo

技术共进,成长同行——讯飞AI开发者社区

更多推荐