1
0
Fork 0

Implement history sync on first login

This commit implements history synchronization/back-fill on first login,
as allowed by WhatsApp. Currently, only history for group-chats/MUCs is
allowed, as 1:1 history synchronization is not operational without core
changes/support/privileges for the user's MAM.

In addition, even though on-demand backfills have been implemented, they
are considered to be inert/of limited use until persistent message
storage is implemented, as the `session.muc_sent_msg_ids` storage
currently utilized is in-memory only, and WhatsApp requires full message
metadata for on-demand history synchronization.
This commit is contained in:
Alex Palaistras 2023-12-12 16:41:33 +00:00
parent ae21c6f365
commit 9caf826b0b
4 changed files with 225 additions and 35 deletions

View File

@ -51,7 +51,7 @@ type EventPayload struct {
Call Call
}
// A Avatar represents a small image representing a Contact or Group.
// A Avatar represents a small image set for a Contact or Group.
type Avatar struct {
ID string // The unique ID for this avatar, used for persistent caching.
URL string // The HTTP URL over which this avatar might be retrieved. Can change for the same ID.
@ -149,6 +149,8 @@ type Message struct {
Attachments []Attachment // The list of file (image, video, etc.) attachments contained in this message.
Preview Preview // A short description for the URL provided in the message body, if any.
MentionJIDs []string // A list of JIDs mentioned in this message, if any.
Receipts []Receipt // The receipt statuses for the message, typically provided alongside historical messages.
Reactions []Message // Reactions attached to message, typically provided alongside historical messages.
}
// A Attachment represents additional binary data (e.g. images, videos, documents) provided alongside
@ -220,28 +222,15 @@ func newMessageEvent(client *whatsmeow.Client, evt *events.Message) (EventKind,
}
// Handle message attachments, if any.
if attach, err := getMessageAttachments(client, evt.Message); err != nil {
if attach, context, err := getMessageAttachments(client, evt.Message); err != nil {
client.Log.Errorf("Failed getting message attachments: %s", err)
return EventUnknown, nil
} else if len(attach) > 0 {
message.Attachments = append(message.Attachments, attach...)
message.Kind = MessageAttachment
}
// Get contact vCard from message, if any, converting it into an inline attachment.
if c := evt.Message.GetContactMessage(); c != nil {
tmp, err := createTempFile([]byte(c.GetVcard()))
if err != nil {
client.Log.Errorf("Failed getting contact message: %s", err)
return EventUnknown, nil
if context != nil {
message = getMessageWithContext(message, context)
}
message.Attachments = append(message.Attachments, Attachment{
MIME: "text/vcard",
Filename: c.GetDisplayName() + ".vcf",
Path: tmp,
})
message.Kind = MessageAttachment
message = getMessageWithContext(message, c.GetContextInfo())
}
// Get extended information from message, if available. Extended messages typically represent
@ -286,8 +275,9 @@ func getMessageWithContext(message Message, info *proto.ContextInfo) Message {
// GetMessageAttachments fetches and decrypts attachments (images, audio, video, or documents) sent
// via WhatsApp. Any failures in retrieving any attachment will return an error immediately.
func getMessageAttachments(client *whatsmeow.Client, message *proto.Message) ([]Attachment, error) {
func getMessageAttachments(client *whatsmeow.Client, message *proto.Message) ([]Attachment, *proto.ContextInfo, error) {
var result []Attachment
var context *proto.ContextInfo
var kinds = []whatsmeow.DownloadableMessage{
message.GetImageMessage(),
message.GetAudioMessage(),
@ -325,19 +315,33 @@ func getMessageAttachments(client *whatsmeow.Client, message *proto.Message) ([]
// Attempt to download and decrypt raw attachment data, if any.
data, err := client.Download(msg)
if err != nil {
return nil, err
return nil, nil, err
}
tmp, err := createTempFile(data)
if err != nil {
return nil, fmt.Errorf("failed writing to temporary file: %w", err)
return nil, nil, fmt.Errorf("failed writing to temporary file: %w", err)
}
a.Path = tmp
result = append(result, a)
}
return result, nil
// Handle any contact vCard as attachment.
if c := message.GetContactMessage(); c != nil {
tmp, err := createTempFile([]byte(c.GetVcard()))
if err != nil {
return nil, nil, fmt.Errorf("Failed getting contact message: %w", err)
}
result = append(result, Attachment{
MIME: "text/vcard",
Filename: c.GetDisplayName() + ".vcf",
Path: tmp,
})
context = c.GetContextInfo()
}
return result, context, nil
}
// KnownMediaTypes represents MIME type to WhatsApp media types known to be handled by WhatsApp in a
@ -482,6 +486,127 @@ func extensionByType(typ string) string {
return ".bin"
}
// NewEventFromHistory returns event data meant for [Session.propagateEvent] for the primive history
// message given. Currently, only events related to group-chats will be handled, due to uncertain
// support for history back-fills on 1:1 chats.
//
// Otherwise, the implementation largely follows that of [newMessageEvent], however the base types
// used by these two functions differ in many small ways which prevent unifying the approach.
//
// Typically, this will return [EventMessage] events with appropriate [Message] payloads; unknown or
// invalid messages will return an [EventUnknown] event with nil data.
func newEventFromHistory(client *whatsmeow.Client, info *proto.WebMessageInfo) (EventKind, *EventPayload) {
// Handle message as group message is remote JID is a group JID in the absence of any other,
// specific signal, or don't handle at all if no group JID is found.
var jid = info.GetKey().GetRemoteJid()
if j, _ := types.ParseJID(jid); j.Server != types.GroupServer {
return EventUnknown, nil
}
// Set basic data for message, to be potentially amended depending on the concrete version of
// the underlying message.
var message = Message{
Kind: MessagePlain,
ID: info.GetKey().GetId(),
GroupJID: info.GetKey().GetRemoteJid(),
Body: info.GetMessage().GetConversation(),
Timestamp: int64(info.GetMessageTimestamp()),
IsCarbon: info.GetKey().GetFromMe(),
}
if info.Participant != nil {
message.JID = info.GetParticipant()
} else if info.GetKey().GetFromMe() {
message.JID = client.Store.ID.ToNonAD().String()
} else {
// It's likely we cannot handle this message correctly if we don't know the concrete
// sender, so just ignore it completely.
return EventUnknown, nil
}
// Handle handle protocol messages (such as message deletion or editing), while ignoring known
// unhandled types.
switch info.GetMessageStubType() {
case proto.WebMessageInfo_CIPHERTEXT:
return EventUnknown, nil
case proto.WebMessageInfo_CALL_MISSED_VOICE, proto.WebMessageInfo_CALL_MISSED_VIDEO:
return EventCall, &EventPayload{Call: Call{
State: CallMissed,
JID: info.GetKey().GetRemoteJid(),
Timestamp: int64(info.GetMessageTimestamp()),
}}
case proto.WebMessageInfo_REVOKE:
if p := info.GetMessageStubParameters(); len(p) > 0 {
message.Kind = MessageRevoke
message.ID = p[0]
return EventMessage, &EventPayload{Message: message}
} else {
return EventUnknown, nil
}
}
// Handle emoji reaction to existing message.
for _, r := range info.GetReactions() {
if r.GetText() != "" {
message.Reactions = append(message.Reactions, Message{
Kind: MessageReaction,
ID: r.GetKey().GetId(),
JID: r.GetKey().GetRemoteJid(),
Body: r.GetText(),
Timestamp: r.GetSenderTimestampMs() / 1000,
IsCarbon: r.GetKey().GetFromMe(),
})
}
}
// Handle message attachments, if any.
if attach, context, err := getMessageAttachments(client, info.GetMessage()); err != nil {
client.Log.Errorf("Failed getting message attachments: %s", err)
return EventUnknown, nil
} else if len(attach) > 0 {
message.Attachments = append(message.Attachments, attach...)
message.Kind = MessageAttachment
if context != nil {
message = getMessageWithContext(message, context)
}
}
// Handle pre-set receipt status, if any.
for _, r := range info.GetUserReceipt() {
// Ignore self-receipts for the moment, as these cannot be handled correctly by the adapter.
if client.Store.ID.ToNonAD().String() == r.GetUserJid() {
continue
}
var receipt = Receipt{MessageIDs: []string{message.ID}, JID: r.GetUserJid(), GroupJID: message.GroupJID}
switch info.GetStatus() {
case proto.WebMessageInfo_DELIVERY_ACK:
receipt.Kind = ReceiptDelivered
receipt.Timestamp = r.GetReceiptTimestamp()
case proto.WebMessageInfo_READ:
receipt.Kind = ReceiptRead
receipt.Timestamp = r.GetReadTimestamp()
}
message.Receipts = append(message.Receipts, receipt)
}
// Get extended information from message, if available. Extended messages typically represent
// messages with additional context, such as replies, forwards, etc.
if e := info.GetMessage().GetExtendedTextMessage(); e != nil {
if message.Body == "" {
message.Body = e.GetText()
}
message = getMessageWithContext(message, e.GetContextInfo())
}
// Ignore obviously invalid messages.
if message.Kind == MessagePlain && message.Body == "" {
return EventUnknown, nil
}
return EventMessage, &EventPayload{Message: message}
}
// ChatStateKind represents the different kinds of chat-states possible in WhatsApp.
type ChatStateKind int
@ -577,7 +702,7 @@ const (
)
// A Group represents a named, many-to-many chat space which may be joined or left at will. All
// fields apart from the group JID, are considered to be optional, and may not be set in cases where
// fields apart from the group JID are considered to be optional, and may not be set in cases where
// group information is being updated against previous assumed state. Groups in WhatsApp are
// generally invited to out-of-band with respect to overarching adaptor; see the documentation for
// [Session.GetGroups] for more information.
@ -616,9 +741,9 @@ type GroupParticipant struct {
Action GroupParticipantAction // The specific action to take for this participant; typically to add.
}
// NewReceiptEvent returns event data meant for [Session.propagateEvent] for the primive group
// event given. Group data returned by this function can be partial, and callers should take care
// to only handle non-empty values.
// NewGroupEvent returns event data meant for [Session.propagateEvent] for the primive group event
// given. Group data returned by this function can be partial, and callers should take care to only
// handle non-empty values.
func newGroupEvent(evt *events.GroupInfo) (EventKind, *EventPayload) {
var group = Group{JID: evt.JID.ToNonAD().String()}
if evt.Name != nil {

View File

@ -37,10 +37,9 @@ class Participant(LegacyParticipant):
class MUC(LegacyMUC[str, str, Participant, str]):
session: "Session"
REACTIONS_SINGLE_EMOJI = True
type = MucType.GROUP
REACTIONS_SINGLE_EMOJI = True
_ALL_INFO_FILLED_ON_STARTUP = True
HAS_DESCRIPTION = False
@ -59,6 +58,28 @@ class MUC(LegacyMUC[str, str, Participant, str]):
if avatar.URL:
await self.set_avatar(avatar.URL, avatar.ID)
async def backfill(
self,
oldest_message_id: Optional[str] = None,
oldest_message_date: Optional[datetime] = None,
):
"""
Request history for messages older than the oldest message given by ID and date.
"""
if oldest_message_id not in self.session.muc_sent_msg_ids:
# WhatsApp requires a full reference to the last seen message in performing on-demand sync.
return
oldest_message = whatsapp.Message(
ID=oldest_message_id or "",
IsCarbon=self.session.message_is_carbon(self, oldest_message_id)
if oldest_message_id
else False,
Timestamp=int(oldest_message_date.timestamp())
if oldest_message_date
else 0,
)
self.session.whatsapp.RequestMessageHistory(self.legacy_id, oldest_message)
def get_message_sender(self, legacy_msg_id: str):
sender_legacy_id = self.sent.get(legacy_msg_id)
if sender_legacy_id is None:

View File

@ -40,6 +40,10 @@ const (
// (e.g. for an initial interval of 2 hours, the final value will range from 1 to 3 hours) in
// order to provide a more natural interaction with remote WhatsApp servers.
presenceRefreshInterval = 12 * time.Hour
// The maximum number of messages to request at a time when performing on-demand history
// synchronization.
maxHistorySyncMessages = 50
)
// HandleEventFunc represents a handler for incoming events sent to the Python Session, accepting an
@ -80,6 +84,7 @@ func (s *Session) Login() error {
s.client = whatsmeow.NewClient(store, s.gateway.logger)
s.client.AddEventHandler(s.handleEvent)
s.client.AutomaticMessageRerequestFromPhone = true
// Refresh contact presences on a set interval, to avoid issues with WhatsApp dropping them
// entirely. Contact presences are refreshed only if our current status is set to "available";
@ -607,6 +612,35 @@ func (s *Session) FindContact(phone string) (Contact, error) {
return Contact{JID: resp[0].JID.ToNonAD().String()}, nil
}
// RequestMessageHistory sends and asynchronous request for message history related to the given
// resource (e.g. Contact or Group JID), ending at the oldest message given. Messages returned from
// history should then be handled as a `HistorySync` event of type `ON_DEMAND`, in the session-wide
// event handler. An error will be returned if requesting history fails for any reason.
func (s *Session) RequestMessageHistory(resourceID string, oldestMessage Message) error {
if s.client == nil || s.client.Store.ID == nil {
return fmt.Errorf("Cannot request history for unauthenticated session")
}
jid, err := types.ParseJID(resourceID)
if err != nil {
return fmt.Errorf("Could not parse JID for history request: %s", err)
}
info := &types.MessageInfo{
ID: oldestMessage.ID,
MessageSource: types.MessageSource{Chat: jid, IsFromMe: oldestMessage.IsCarbon},
Timestamp: time.Unix(oldestMessage.Timestamp, 0).UTC(),
}
req := s.client.BuildHistorySyncRequest(info, maxHistorySyncMessages)
_, err = s.client.SendMessage(context.Background(), s.device.JID().ToNonAD(), req, whatsmeow.SendRequestExtra{Peer: true})
if err != nil {
return fmt.Errorf("Failed to request history for %s: %s", resourceID, err)
}
return nil
}
// SetEventHandler assigns the given handler function for propagating internal events into the Python
// gateway. Note that the event handler function is not entirely safe to use directly, and all calls
// should instead be made via the [propagateEvent] function.
@ -662,7 +696,7 @@ func (s *Session) handleEvent(evt interface{}) {
case *events.HistorySync:
switch evt.Data.GetSyncType() {
case proto.HistorySync_PUSH_NAME:
for _, n := range evt.Data.Pushnames {
for _, n := range evt.Data.GetPushnames() {
jid, err := types.ParseJID(n.GetId())
if err != nil {
continue
@ -672,6 +706,12 @@ func (s *Session) handleEvent(evt interface{}) {
s.gateway.logger.Warnf("Failed to subscribe to presence for %s", jid)
}
}
case proto.HistorySync_INITIAL_BOOTSTRAP, proto.HistorySync_RECENT, proto.HistorySync_ON_DEMAND:
for _, c := range evt.Data.GetConversations() {
for _, msg := range c.GetMessages() {
s.propagateEvent(newEventFromHistory(s.client, msg.GetMessage()))
}
}
}
case *events.Message:
s.propagateEvent(newMessageEvent(s.client, evt))

View File

@ -236,6 +236,10 @@ class Session(BaseSession[str, Recipient]):
contact.react(
legacy_msg_id=message.ID, emojis=emojis, carbon=message.IsCarbon
)
for receipt in message.Receipts:
await self.handle_receipt(receipt)
for reaction in message.Reactions:
await self.handle_message(reaction)
async def on_text(
self,
@ -370,7 +374,7 @@ class Session(BaseSession[str, Recipient]):
Slidge core makes sure that the emojis parameter is always empty or a
*single* emoji.
"""
is_carbon = self.__is_carbon(c, legacy_msg_id)
is_carbon = self.message_is_carbon(c, legacy_msg_id)
message_sender_id = (
c.get_message_sender(legacy_msg_id)
if not is_carbon and isinstance(c, MUC)
@ -475,6 +479,12 @@ class Session(BaseSession[str, Recipient]):
items=[{"phone": cast(str, phone), "jid": contact.jid.bare}],
)
def message_is_carbon(self, c: Recipient, legacy_msg_id: str):
if c.is_group:
return legacy_msg_id in self.muc_sent_msg_ids
else:
return legacy_msg_id in self.sent
def __get_connected_status_message(self):
return f"Connected as {self.user_phone}"
@ -552,12 +562,6 @@ class Session(BaseSession[str, Recipient]):
else:
return await self.contacts.by_legacy_id(legacy_contact_id)
def __is_carbon(self, c: Recipient, legacy_msg_id: str):
if c.is_group:
return legacy_msg_id in self.muc_sent_msg_ids
else:
return legacy_msg_id in self.sent
class Attachment(LegacyAttachment):
@staticmethod