wxpy.api.messages.message 源代码

# coding: utf-8
from __future__ import unicode_literals

import logging
import os
import re
from collections import namedtuple
from contextlib import closing
from datetime import datetime
from xml.etree import ElementTree as ETree

from wxpy.api.messages.message_types import *
from wxpy.compatible import PY2
from wxpy.compatible.utils import force_encoded_string_output
from wxpy.utils import repr_message

if PY2:
    # noinspection PyUnresolvedReferences
    from urllib import urlencode
else:
    from urllib.parse import urlencode

logger = logging.getLogger(__name__)

# 公众号推送中的单篇文章内容 (一次可推送多篇)
# 属性: 标题, 摘要, 文章 URL, 封面图片 URL
Article = namedtuple('Article', ['title', 'summary', 'url', 'cover'])

# 转账信息: 金额, 描述, 交易单号 (转账消息可被重复发送,所以要注意核对单号)
Cash = namedtuple('Cash', ['amount', 'description', 'id'])


class Message(object):
    """
    单条消息对象,包括:
    
    * 来自好友、群聊、好友请求等聊天对象的消息
    * 使用机器人账号在手机微信中发送的消息
    
    | 但 **不包括** 代码中通过 .send/reply() 系列方法发出的消息
    | 此类消息请参见 :class:`SentMessage`
    """

    def __init__(self, core, raw):
        self.core = core
        self.bot = self.core.bot

        self.raw = raw

        self._receive_time = datetime.now()

        self._file_ext = None

        # 将 msg.chat.send* 方法绑定到 msg.reply*,例如 msg.chat.send_img => msg.reply_img
        for method in '', '_image', '_sticker', '_file', '_video':
            setattr(self, 'reply' + method, getattr(self.chat, 'send' + method))

    def __hash__(self):
        return hash((Message, self.id))

    @force_encoded_string_output
    def __repr__(self):
        return repr_message(self)

    def __unicode__(self):
        return repr_message(self)

    # basic

    @property
    def type(self):
        """
        消息的类型,目前可为以下值::
        
            # 文本
            TEXT = 'TEXT'
            # 位置
            LOCATION = 'LOCATION'
            # 图片
            IMAGE = 'IMAGE'
            # 语音
            VOICE = 'VOICE'
            # 好友验证
            NEW_FRIEND = 'NEW_FRIEND'
            # 名片
            CARD = 'CARD'
            # 视频
            VIDEO = 'VIDEO'
            # 表情 (不支持商店表情,下载前请先检查 file_size 属性)
            EMOTICON = 'EMOTICON'
            # URL
            URL = 'SHARE_URL'
            # 文件
            FILE = 'FILE'
            # 转账
            CASH = 'CASH'
            # 系统提示
            NOTICE = 'NOTICE'
            # 撤回提示
            RECALLED = 'RECALLED'
            # 未知
            UNKNOWN = 'UNKNOWN'
        
        :rtype: MessageType
        """

        _type = MessageType(
            main=self.raw.get('MsgType'),
            app=self.raw.get('AppMsgType'),
            sub=self.raw.get('SubMsgType'),
        )

        for t in KNOWN_MSG_TYPES:
            if _type == t:
                _type.name = t.name
                break

        return _type

    @property
    def id(self):
        """
        消息的唯一 ID (通常为大于 0 的 64 位整型)
        """
        return self.raw.get('NewMsgId')

    # content

    @property
    def text(self):
        """
        消息的文本内容
        """

        if self.type in (TEXT, NOTICE):
            return self._content
        elif self.type == LOCATION:
            found = re.search(r'^.+(?=:\n)', self._content)
            if found:
                return found.group()
        elif self.type in (URL, CASH):
            return self._content_xml.findtext('.//title')
        elif self.type == NEW_FRIEND:
            return self._content_xml.get('content')
        elif self.type == RECALLED:
            return self._content_xml.findtext('.//replacemsg')
        elif self.type == FILE:
            return self.file_name
        elif self.type == CARD:
            return self.card.name

    @property
    def _file_url(self):

        """
        | 消息中文件的下载地址 (内部使用)
        | 注意: 该 URL 会验证 cookies, 只能使用登陆所在的 session 进行下载
        | 若需下载,请使用 `get_file()` 方法
        """

        uris = self.core.uris
        tree = self._content_xml

        upper_params = {'MsgID': self.id, 'skey': self.core.data.skey, 'type': 'big'}
        lower_params = {'msgid': self.id, 'skey': self.core.data.skey}

        match = {
            IMAGE: (uris.get_msg_img, upper_params),
            STICKER: (uris.get_msg_img, upper_params),
            VOICE: (uris.get_voice, lower_params),
            VIDEO: (uris.get_video, lower_params),
        }.get(self.type)

        if tree and match:
            return '{}?{}'.format(match[0], urlencode(match[1]))
        elif self.type == FILE:
            return '{}?{}'.format(uris.get_media, urlencode(dict(
                sender=self.raw['FromUserName'],
                mediaid=self.media_id,
                filename=self._content_xml.findtext('.//title'),
                fromuser=self.core.data.raw_self['Uin'],
                pass_ticket=self.core.data.pass_ticket,
                webwx_data_ticket=self.core.from_cookies('webwx_data_ticket')
            )))

    @property
    def file_name(self):
        """
        消息中文件的文件名 (含后缀名)
        """
        if self._content_xml and self.type in (IMAGE, STICKER, VOICE, VIDEO):
            return '{}{}'.format(self.id, self.file_ext)
        elif self.type == FILE:
            return self._content_xml.findtext('.//title')

    @property
    def file_ext(self):
        """ 消息中文件的后缀名,例如 .jpeg, .png, .mp3 """

        if self._content_xml and self.type in (IMAGE, STICKER, VOICE, VIDEO):
            if not self._file_ext:
                with closing(self.core.session.get(self._file_url, stream=True)) as resp:
                    if resp.headers.get('Content-Type'):
                        self._file_ext = '.{}'.format(re.findall(r'\w+', resp.headers['Content-Type'])[-1])
            return self._file_ext
        elif self.type == FILE:
            return os.path.splitext(self.file_name)[1]

[文档] def get_file(self, save_path=None): """ 下载图片、视频、语音、附件消息中的文件内容。 可与 :any:`Message.file_name`, :any:`Message.file_ext` 配合使用。 :param save_path: 文件的保存路径。若为 None,将直接返回字节数据 """ if self._file_url: return self.core.download(self._file_url, save_path)
@property def file_size(self): """ 消息中文件的体积大小 """ match = { IMAGE: ('img', 'hdlength'), STICKER: ('img', 'hdlength'), VOICE: ('voicemsg', 'length'), VIDEO: ('videomsg', 'length'), }.get(self.type) if self._content_xml and match: return int(self._content_xml.find(match[0]).get(match[1])) elif self.type == FILE: return int(self._content_xml.findtext('.//totallen')) @property def media_id(self): """ 文件类消息中的文件资源 ID (但图片视频语音等其他消息中为空) """ return self.raw.get('MediaId') # group @property def is_at(self): """ 当消息来自群聊,且被 @ 时,为 True """ from wxpy.api.chats import Group if self.type == TEXT and isinstance(self.chat, Group): return bool(re.search(r'@' + re.escape(self.chat.self.name) + r'(?:\u2005|\s|$)', self.text)) # misc @property def url(self): """ 分享类消息中的网页 URL """ return self.raw.get('Url') @property def articles(self): """ 公众号推送中的文章列表 (首篇的 标题/地址 与消息中的 text/url 相同) 其中,每篇文章均有以下属性: * `title`: 标题 * `summary`: 摘要 * `url`: 文章 URL * `cover`: 封面或缩略图 URL """ from wxpy import MP if self.type == URL and isinstance(self.sender, MP): tree = ETree.fromstring(self.raw['Content']) # noinspection SpellCheckingInspection items = tree.findall('.//mmreader/category/item') article_list = list() for item in items: article = Article( title=item.findtext('title'), summary=item.findtext('digest'), url=item.findtext('url'), cover=item.findtext('cover'), ) article_list.append(article) return article_list @property def card(self): """ * 好友请求中的请求用户 * 名片消息中的推荐用户 """ from wxpy.api.chats import User if self.type in (CARD, NEW_FRIEND): return User(self.core, self.raw.get('RecommendInfo')) @property def recalled_id(self): """ 被撤回消息的消息 ID """ if self.type == RECALLED: return int(self._content_xml.findtext('.//msgid')) @property def cash(self): if self.type == CASH: tree = self._content_xml.find('.//wcpayinfo') return Cash( amount=float(re.search(r'\d+\.\d+', tree.findtext('feedesc')).group()), description=tree.findtext('pay_memo'), id=tree.findtext('transcationid'), ) @property def img_height(self): """ 图片高度 """ return self.raw.get('ImgHeight') @property def img_width(self): """ 图片宽度 """ return self.raw.get('ImgWidth') @property def play_length(self): """ 视频长度 """ return self.raw.get('PlayLength') @property def voice_length(self): """ 语音长度 """ return self.raw.get('VoiceLength') # time @property def create_time(self): """ 服务端发送时间 """ # noinspection PyBroadException try: return datetime.fromtimestamp(self.raw.get('CreateTime')) except: pass @property def receive_time(self): """ 本地接收时间 """ return self._receive_time @property def latency(self): """ 消息的延迟秒数 (发送时间和接收时间的差值) """ create_time = self.create_time if create_time: return (self.receive_time - create_time).total_seconds() @property def location(self): """ 位置消息中的地理位置信息 """ try: ret = ETree.fromstring(self.raw['OriContent']).find('location').attrib try: ret['x'] = float(ret['x']) ret['y'] = float(ret['y']) ret['scale'] = int(ret['scale']) ret['maptype'] = int(ret['maptype']) except (KeyError, ValueError): pass return ret except (TypeError, KeyError, ValueError, ETree.ParseError): pass # chats @property def chat(self): """ 消息所在的聊天会话,即: * 对于自己发送的消息,为消息的接收者 * 对于别人发送的消息,为消息的发送者 :rtype: :class:`wxpy.User`, :class:`wxpy.Group` """ if self.raw.get('FromUserName') == self.core.username: return self.receiver else: return self.sender @property def sender(self): """ 消息的发送者 :rtype: :class:`wxpy.User`, :class:`wxpy.Group` """ return self.core.get_chat_obj(self.raw.get('FromUserName')) @property def receiver(self): """ 消息的接收者 :rtype: :class:`wxpy.User`, :class:`wxpy.Group` """ return self.core.get_chat_obj(self.raw.get('ToUserName')) @property def member(self): """ * 若消息来自群聊,则此属性为消息的实际发送人(具体的群成员) * 若消息来自其他聊天对象(非群聊),则此属性为 None :rtype: NoneType, :class:`wxpy.Member` """ from wxpy.api.chats import Group if isinstance(self.chat, Group): found = re.search(r'^(@[\da-f]+):\n', self.raw['Content']) if found: return self.core.get_chat_obj(found.group(1), self.chat.username) elif self.type != NOTICE: return self.chat.self @property def _content(self): """ Content 字段中去除群员 username 后剩余的部分 """ from wxpy.api.chats import Group if isinstance(self.sender, Group): content = re.sub(r'^@[\da-f]+:\n', '', self.raw['Content']) if self.type == VIDEO: # 发现当有人在群里发视频时,有时 Content 顶部会多 'xxx:\n' 在第二行 (xxx 是该群员的微信 ID) content = re.sub(r'^[\da-zA-Z\-_]+:\n', '', content) return content else: return self.raw['Content'] @property def _content_xml(self): """ Content 字段中的 xml 对象 """ try: return ETree.fromstring(self._content) except ETree.ParseError: pass
[文档] def forward(self, chat, prefix=None, suffix=None): """ 将本消息转发给其他聊天对象 支持以下消息类型 * 文本 (`TEXT`) * 图片 (`IMAGE`) * 自定义表情 (`STICKER`) * 注: 不支持表情商店中的表情 * 视频(`VIDEO`) * 文件 (`FILE`) * 名片 (`CARD`) * 语音 (`VOICE`) * 注: 会以文件方式发送 * 分享链接 (`URL`) * 注: 会转化为 `标题 + 链接` 形式的文本消息 * 地图 (`LOCATION`) * 注: 会转化为 `位置名称 + 地图链接` 形式的文本消息 :param Chat chat: 接收转发消息的聊天对象 :param str prefix: 转发时增加的 **前缀** 文本,原消息为文本时会自动换行 :param str suffix: 转发时增加的 **后缀** 文本,原消息为文本时会自动换行 :return: 若该消息支持转发,返回转发后的 :class:`SentMessage` 对象;反之返回 `NotImplemented` 例如,将公司群中的老板消息转发出来:: from wxpy import * bot = Bot() # 定位公司群 company_group = bot.groups.get('公司微信群') # 定位老板 boss = company_group.get('老板大名') # 将老板的消息转发到文件传输助手 @bot.register(company_group) def forward_boss_message(msg): if msg.member == boss: msg.forward(bot.file_helper, prefix='老板发言') # 阻塞线程 embed() """ logger.info('{}: forwarding to {}: {}'.format(self.bot, chat, self)) if self.type in (TEXT, NOTICE): return chat.send('{}{}{}'.format( '{}\n'.format(prefix) if prefix else '', self.text, '{}\n'.format(suffix) if suffix else '', )) elif self.type in (IMAGE, STICKER, VIDEO, FILE): pass