1
0
Fork 0

Implement history sync on first login

This commit implements history synchronization/back-fill on first login,
as allowed by WhatsApp. Currently, only history for group-chats/MUCs is
allowed, as 1:1 history synchronization is not operational without core
changes/support/privileges for the user's MAM.

NOTE: This commit is experimental, and may or may not break your
sessions. Do not use on production environments.
This commit is contained in:
Alex Palaistras 2023-12-12 16:41:33 +00:00
parent 64f23f03e2
commit be23a369a1
4 changed files with 196 additions and 11 deletions

View File

@ -51,7 +51,7 @@ type EventPayload struct {
Call Call
}
// A Avatar represents a small image representing a Contact or Group.
// A Avatar represents a small image set for a Contact or Group.
type Avatar struct {
ID string // The unique ID for this avatar, used for persistent caching.
URL string // The HTTP URL over which this avatar might be retrieved. Can change for the same ID.
@ -148,6 +148,8 @@ type Message struct {
ReplyBody string // The full body of the message this message is in reply to, if any.
Attachments []Attachment // The list of file (image, video, etc.) attachments contained in this message.
Preview Preview // A short description for the URL provided in the message body, if any.
Receipts []Receipt // The receipt statuses for the message, typically provided alongside historical messages.
Reactions []Message // Reactions attached to message, typically provided alongside historical messages.
}
// A Attachment represents additional binary data (e.g. images, videos, documents) provided alongside
@ -480,6 +482,124 @@ func extensionByType(typ string) string {
return ".bin"
}
// NewEventFromHistory returns event data meant for [Session.propagateEvent] for the primive history
// message given. Currently, only events related to group-chats will be handled, due to uncertain
// support for history back-fills on 1:1 chats.
//
// Otherwise, the implementation largely follows that of [newMessageEvent], however the base types
// used by these two functions differ in many small ways which prevent unifying the approach.
//
// Typically, this will return [EventMessage] events with appropriate [Message] payloads; unknown or
// invalid messages will return an [EventUnknown] event with nil data.
func newEventFromHistory(client *whatsmeow.Client, info *proto.WebMessageInfo) (EventKind, *EventPayload) {
// Handle message as group message is remote JID is a group JID in the absence of any other,
// specific signal, or don't handle at all if no group JID is found.
var jid = info.GetKey().GetRemoteJid()
if j, _ := types.ParseJID(jid); j.Server != types.GroupServer {
return EventUnknown, nil
}
// Set basic data for message, to be potentially amended depending on the concrete version of
// the underlying message.
var message = Message{
Kind: MessagePlain,
ID: info.GetKey().GetId(),
GroupJID: info.GetKey().GetRemoteJid(),
Body: info.GetMessage().GetConversation(),
Timestamp: int64(info.GetMessageTimestamp()),
IsCarbon: info.GetKey().GetFromMe(),
}
if info.Participant != nil {
message.JID = info.GetParticipant()
} else if info.GetKey().GetFromMe() {
message.JID = client.Store.ID.ToNonAD().String()
} else {
// It's likely we cannot handle this message correctly if we don't know the concrete
// sender, so just ignore it completely.
return EventUnknown, nil
}
// Handle handle protocol messages (such as message deletion or editing), while ignoring known
// unhandled types.
switch info.GetMessageStubType() {
case proto.WebMessageInfo_CIPHERTEXT:
return EventUnknown, nil
case proto.WebMessageInfo_CALL_MISSED_VOICE, proto.WebMessageInfo_CALL_MISSED_VIDEO:
return EventCall, &EventPayload{Call: Call{
State: CallMissed,
JID: info.GetKey().GetRemoteJid(),
Timestamp: int64(info.GetMessageTimestamp()),
}}
case proto.WebMessageInfo_REVOKE:
if p := info.GetMessageStubParameters(); len(p) > 0 {
message.Kind = MessageRevoke
message.ID = p[0]
return EventMessage, &EventPayload{Message: message}
} else {
return EventUnknown, nil
}
}
// Handle emoji reaction to existing message.
for _, r := range info.GetReactions() {
if r.GetText() != "" {
message.Reactions = append(message.Reactions, Message{
Kind: MessageReaction,
ID: r.GetKey().GetId(),
JID: r.GetKey().GetRemoteJid(),
Body: r.GetText(),
Timestamp: r.GetSenderTimestampMs() / 1000,
IsCarbon: r.GetKey().GetFromMe(),
})
}
}
// Handle message attachments, if any.
if attach, err := getMessageAttachments(client, info.GetMessage()); err != nil {
client.Log.Errorf("Failed getting message attachments: %s", err)
return EventUnknown, nil
} else if len(attach) > 0 {
message.Attachments = append(message.Attachments, attach...)
message.Kind = MessageAttachment
}
// Handle pre-set receipt status, if any.
for _, r := range info.GetUserReceipt() {
// Ignore self-receipts for the moment, as these cannot be handled correctly by the adapter.
if client.Store.ID.ToNonAD().String() == r.GetUserJid() {
continue
}
var receipt = Receipt{MessageIDs: []string{message.ID}, JID: r.GetUserJid(), GroupJID: message.GroupJID}
switch info.GetStatus() {
case proto.WebMessageInfo_DELIVERY_ACK:
receipt.Kind = ReceiptDelivered
receipt.Timestamp = r.GetReceiptTimestamp()
case proto.WebMessageInfo_READ:
receipt.Kind = ReceiptRead
receipt.Timestamp = r.GetReadTimestamp()
}
message.Receipts = append(message.Receipts, receipt)
}
// Get extended information from message, if available. Extended messages typically represent
// messages with additional context, such as replies, forwards, etc.
if e := info.GetMessage().GetExtendedTextMessage(); e != nil {
if message.Body == "" {
message.Body = e.GetText()
}
message = getMessageWithContext(message, e.GetContextInfo())
}
// Ignore obviously invalid messages.
if message.Kind == MessagePlain && message.Body == "" {
return EventUnknown, nil
}
return EventMessage, &EventPayload{Message: message}
}
// ChatStateKind represents the different kinds of chat-states possible in WhatsApp.
type ChatStateKind int
@ -575,7 +695,7 @@ const (
)
// A Group represents a named, many-to-many chat space which may be joined or left at will. All
// fields apart from the group JID, are considered to be optional, and may not be set in cases where
// fields apart from the group JID are considered to be optional, and may not be set in cases where
// group information is being updated against previous assumed state. Groups in WhatsApp are
// generally invited to out-of-band with respect to overarching adaptor; see the documentation for
// [Session.GetGroups] for more information.
@ -614,9 +734,9 @@ type GroupParticipant struct {
Action GroupParticipantAction // The specific action to take for this participant; typically to add.
}
// NewReceiptEvent returns event data meant for [Session.propagateEvent] for the primive group
// event given. Group data returned by this function can be partial, and callers should take care
// to only handle non-empty values.
// NewGroupEvent returns event data meant for [Session.propagateEvent] for the primive group event
// given. Group data returned by this function can be partial, and callers should take care to only
// handle non-empty values.
func newGroupEvent(evt *events.GroupInfo) (EventKind, *EventPayload) {
var group = Group{JID: evt.JID.ToNonAD().String()}
if evt.Name != nil {

View File

@ -1,6 +1,6 @@
import re
from datetime import datetime, timezone
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Optional
from slidge.group import LegacyBookmarks, LegacyMUC, LegacyParticipant, MucType
from slixmpp.exceptions import XMPPError
@ -35,10 +35,9 @@ class Participant(LegacyParticipant):
class MUC(LegacyMUC[str, str, Participant, str]):
session: "Session"
REACTIONS_SINGLE_EMOJI = True
type = MucType.GROUP
REACTIONS_SINGLE_EMOJI = True
_ALL_INFO_FILLED_ON_STARTUP = True
def __init__(self, *a, **kw):
@ -55,6 +54,28 @@ class MUC(LegacyMUC[str, str, Participant, str]):
if avatar.URL:
await self.set_avatar(avatar.URL, avatar.ID)
async def backfill(
self,
oldest_message_id: Optional[str] = None,
oldest_message_date: Optional[datetime] = None,
):
"""
Request history for messages older than the oldest message given by ID and date.
"""
if oldest_message_id not in self.session.muc_sent_msg_ids:
# WhatsApp requires a full reference to the last seen message in performing on-demand sync.
return
oldest_message = whatsapp.Message(
ID=oldest_message_id or "",
IsCarbon=self.session.message_is_carbon(self, oldest_message_id)
if oldest_message_id
else False,
Timestamp=int(oldest_message_date.timestamp())
if oldest_message_date
else 0,
)
self.session.whatsapp.RequestMessageHistory(self.legacy_id, oldest_message)
def get_message_sender(self, legacy_msg_id: str):
sender_legacy_id = self.sent.get(legacy_msg_id)
if sender_legacy_id is None:

View File

@ -40,6 +40,10 @@ const (
// (e.g. for an initial interval of 2 hours, the final value will range from 1 to 3 hours) in
// order to provide a more natural interaction with remote WhatsApp servers.
presenceRefreshInterval = 12 * time.Hour
// The maximum number of messages to request at a time when performing on-demand history
// synchronization.
maxHistorySyncMessages = 50
)
// HandleEventFunc represents a handler for incoming events sent to the Python Session, accepting an
@ -80,6 +84,7 @@ func (s *Session) Login() error {
s.client = whatsmeow.NewClient(store, s.gateway.logger)
s.client.AddEventHandler(s.handleEvent)
s.client.AutomaticMessageRerequestFromPhone = true
// Refresh contact presences on a set interval, to avoid issues with WhatsApp dropping them
// entirely. Contact presences are refreshed only if our current status is set to "available";
@ -485,6 +490,35 @@ func (s *Session) SetAvatar(resourceID, avatarPath string) (string, error) {
}
}
// RequestMessageHistory sends and asynchronous request for message history related to the given
// resource (e.g. Contact or Group JID), ending at the oldest message given. Messages returned from
// history should then be handled as a `HistorySync` event of type `ON_DEMAND`, in the session-wide
// event handler. An error will be returned if requesting history fails for any reason.
func (s *Session) RequestMessageHistory(resourceID string, oldestMessage Message) error {
if s.client == nil || s.client.Store.ID == nil {
return fmt.Errorf("Cannot request history for unauthenticated session")
}
jid, err := types.ParseJID(resourceID)
if err != nil {
return fmt.Errorf("Could not parse JID for history request: %s", err)
}
info := &types.MessageInfo{
ID: oldestMessage.ID,
MessageSource: types.MessageSource{Chat: jid, IsFromMe: oldestMessage.IsCarbon},
Timestamp: time.Unix(oldestMessage.Timestamp, 0).UTC(),
}
req := s.client.BuildHistorySyncRequest(info, maxHistorySyncMessages)
_, err = s.client.SendMessage(context.Background(), s.device.JID().ToNonAD(), req, whatsmeow.SendRequestExtra{Peer: true})
if err != nil {
return fmt.Errorf("Failed to request history for %s: %s", resourceID, err)
}
return nil
}
// SetEventHandler assigns the given handler function for propagating internal events into the Python
// gateway. Note that the event handler function is not entirely safe to use directly, and all calls
// should instead be made via the [propagateEvent] function.
@ -540,7 +574,7 @@ func (s *Session) handleEvent(evt interface{}) {
case *events.HistorySync:
switch evt.Data.GetSyncType() {
case proto.HistorySync_PUSH_NAME:
for _, n := range evt.Data.Pushnames {
for _, n := range evt.Data.GetPushnames() {
jid, err := types.ParseJID(n.GetId())
if err != nil {
continue
@ -550,6 +584,12 @@ func (s *Session) handleEvent(evt interface{}) {
s.gateway.logger.Warnf("Failed to subscribe to presence for %s", jid)
}
}
case proto.HistorySync_INITIAL_BOOTSTRAP, proto.HistorySync_RECENT, proto.HistorySync_ON_DEMAND:
for _, c := range evt.Data.GetConversations() {
for _, msg := range c.GetMessages() {
s.propagateEvent(newEventFromHistory(s.client, msg.GetMessage()))
}
}
}
case *events.Message:
s.propagateEvent(newMessageEvent(s.client, evt))

View File

@ -292,6 +292,10 @@ class Session(BaseSession[str, Recipient]):
contact.react(
legacy_msg_id=message.ID, emojis=emojis, carbon=message.IsCarbon
)
for ptr in message.Receipts:
await self.handle_receipt(whatsapp.Receipt(handle=ptr))
for ptr in message.Reactions:
await self.handle_message(whatsapp.Message(handle=ptr))
async def send_text(
self,
@ -419,7 +423,7 @@ class Session(BaseSession[str, Recipient]):
Slidge core makes sure that the emojis parameter is always empty or a
*single* emoji.
"""
is_carbon = self._is_carbon(c, legacy_msg_id)
is_carbon = self.message_is_carbon(c, legacy_msg_id)
message_sender_id = (
c.get_message_sender(legacy_msg_id)
if not is_carbon and isinstance(c, MUC)
@ -481,7 +485,7 @@ class Session(BaseSession[str, Recipient]):
else:
return await self.contacts.by_legacy_id(legacy_contact_id)
def _is_carbon(self, c: Recipient, legacy_msg_id: str):
def message_is_carbon(self, c: Recipient, legacy_msg_id: str):
if c.is_group:
return legacy_msg_id in self.muc_sent_msg_ids
else: