mirror of
https://git.sr.ht/~nicoco/slidge-whatsapp
synced 2024-06-03 01:35:30 +00:00
fix deadlock by introducing more locks
don't ask
This commit is contained in:
parent
c48c5baa55
commit
a9d65a2cac
|
@ -92,13 +92,19 @@ func (w *Gateway) SetLogHandler(h HandleLogFunc) {
|
||||||
w.logger = HandleLogFunc(func(level ErrorLevel, message string) {
|
w.logger = HandleLogFunc(func(level ErrorLevel, message string) {
|
||||||
// Don't allow other Goroutines from using this thread, as this might lead to concurrent
|
// Don't allow other Goroutines from using this thread, as this might lead to concurrent
|
||||||
// use of the GIL, which can lead to crashes.
|
// use of the GIL, which can lead to crashes.
|
||||||
|
h(LevelDebug, "Locking OS thread")
|
||||||
runtime.LockOSThread()
|
runtime.LockOSThread()
|
||||||
defer runtime.UnlockOSThread()
|
defer UnlockOSThread(h)
|
||||||
|
|
||||||
h(level, message)
|
h(level, message)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func UnlockOSThread(h HandleLogFunc) {
|
||||||
|
h(LevelDebug, "Unlocking OS thread")
|
||||||
|
runtime.UnlockOSThread()
|
||||||
|
}
|
||||||
|
|
||||||
// Init performs initialization procedures for the Gateway, and is expected to be run before any
|
// Init performs initialization procedures for the Gateway, and is expected to be run before any
|
||||||
// calls to [Gateway.Session].
|
// calls to [Gateway.Session].
|
||||||
func (w *Gateway) Init() error {
|
func (w *Gateway) Init() error {
|
||||||
|
|
|
@ -665,12 +665,18 @@ func (s *Session) propagateEvent(kind EventKind, payload *EventPayload) {
|
||||||
|
|
||||||
// Don't allow other Goroutines from using this thread, as this might lead to concurrent use of
|
// Don't allow other Goroutines from using this thread, as this might lead to concurrent use of
|
||||||
// the GIL, which can lead to crashes.
|
// the GIL, which can lead to crashes.
|
||||||
|
s.gateway.logger.Debugf("Locking OS Thread (propagateEvent)")
|
||||||
runtime.LockOSThread()
|
runtime.LockOSThread()
|
||||||
defer runtime.UnlockOSThread()
|
defer UnlockOSThread2(s)
|
||||||
|
|
||||||
s.eventHandler(kind, payload)
|
s.eventHandler(kind, payload)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func UnlockOSThread2(s *Session) {
|
||||||
|
s.gateway.logger.Debugf("Unlocking OS thread (propagateEvent)")
|
||||||
|
runtime.UnlockOSThread()
|
||||||
|
}
|
||||||
|
|
||||||
// HandleEvent processes the given incoming WhatsApp event, checking its concrete type and
|
// HandleEvent processes the given incoming WhatsApp event, checking its concrete type and
|
||||||
// propagating it to the adapter event handler. Unknown or unhandled events are ignored, and any
|
// propagating it to the adapter event handler. Unknown or unhandled events are ignored, and any
|
||||||
// errors that occur during processing are logged.
|
// errors that occur during processing are logged.
|
||||||
|
|
|
@ -7,7 +7,7 @@ from pathlib import Path
|
||||||
from re import search
|
from re import search
|
||||||
from shelve import open
|
from shelve import open
|
||||||
from tempfile import mkstemp
|
from tempfile import mkstemp
|
||||||
from threading import Lock
|
from threading import Event, Lock
|
||||||
from typing import Optional, Union, cast
|
from typing import Optional, Union, cast
|
||||||
|
|
||||||
from aiohttp import ClientSession
|
from aiohttp import ClientSession
|
||||||
|
@ -75,7 +75,7 @@ class Session(BaseSession[str, Recipient]):
|
||||||
self.whatsapp = self.xmpp.whatsapp.NewSession(device)
|
self.whatsapp = self.xmpp.whatsapp.NewSession(device)
|
||||||
self._handle_event = make_sync(self.handle_event, self.xmpp.loop)
|
self._handle_event = make_sync(self.handle_event, self.xmpp.loop)
|
||||||
self.whatsapp.SetEventHandler(self._handle_event)
|
self.whatsapp.SetEventHandler(self._handle_event)
|
||||||
self._connected = self.xmpp.loop.create_future()
|
self._connected = Event()
|
||||||
self.user_phone: Optional[str] = None
|
self.user_phone: Optional[str] = None
|
||||||
self._lock = Lock()
|
self._lock = Lock()
|
||||||
|
|
||||||
|
@ -86,8 +86,8 @@ class Session(BaseSession[str, Recipient]):
|
||||||
or will re-connect to a previously existing Linked Device session.
|
or will re-connect to a previously existing Linked Device session.
|
||||||
"""
|
"""
|
||||||
self.whatsapp.Login()
|
self.whatsapp.Login()
|
||||||
self._connected = self.xmpp.loop.create_future()
|
await self.xmpp.loop.run_in_executor(None, self._connected.wait)
|
||||||
return await self._connected
|
return self.__get_connected_status_message()
|
||||||
|
|
||||||
async def logout(self):
|
async def logout(self):
|
||||||
"""
|
"""
|
||||||
|
@ -104,29 +104,36 @@ class Session(BaseSession[str, Recipient]):
|
||||||
state required for processing by the Gateway itself, and will do minimal processing themselves.
|
state required for processing by the Gateway itself, and will do minimal processing themselves.
|
||||||
"""
|
"""
|
||||||
data = whatsapp.EventPayload(handle=ptr)
|
data = whatsapp.EventPayload(handle=ptr)
|
||||||
if event == whatsapp.EventQRCode:
|
self.log.debug("Handling event %s", data)
|
||||||
|
if event == whatsapp.EventConnected:
|
||||||
|
self.contacts.user_legacy_id = data.ConnectedJID
|
||||||
|
self.user_phone = "+" + data.ConnectedJID.split("@")[0]
|
||||||
|
self.log.debug("Setting connected")
|
||||||
|
self._connected.set()
|
||||||
|
self.log.debug("Connected set")
|
||||||
|
# this sends the gateway status twice on first login,
|
||||||
|
# but ensures the gateway status is updated when re-pairing
|
||||||
|
self.send_gateway_status(self.__get_connected_status_message(), show="chat")
|
||||||
|
return
|
||||||
|
elif event == whatsapp.EventQRCode:
|
||||||
self.send_gateway_status("QR Scan Needed", show="dnd")
|
self.send_gateway_status("QR Scan Needed", show="dnd")
|
||||||
await self.send_qr(data.QRCode)
|
await self.send_qr(data.QRCode)
|
||||||
|
return
|
||||||
elif event == whatsapp.EventPair:
|
elif event == whatsapp.EventPair:
|
||||||
self.send_gateway_message(MESSAGE_PAIR_SUCCESS)
|
self.send_gateway_message(MESSAGE_PAIR_SUCCESS)
|
||||||
with open(str(self.user_shelf_path)) as shelf:
|
with open(str(self.user_shelf_path)) as shelf:
|
||||||
shelf["device_id"] = data.PairDeviceID
|
shelf["device_id"] = data.PairDeviceID
|
||||||
elif event == whatsapp.EventConnected:
|
return
|
||||||
if self._connected.done():
|
|
||||||
# On re-pair, Session.login() is not called by slidge core, so
|
|
||||||
# the status message is not updated
|
|
||||||
self.send_gateway_status(
|
|
||||||
self.__get_connected_status_message(), show="chat"
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
self.contacts.user_legacy_id = data.ConnectedJID
|
|
||||||
self.user_phone = "+" + data.ConnectedJID.split("@")[0]
|
|
||||||
self._connected.set_result(self.__get_connected_status_message())
|
|
||||||
elif event == whatsapp.EventLoggedOut:
|
elif event == whatsapp.EventLoggedOut:
|
||||||
self.logged = False
|
self.logged = False
|
||||||
|
self._connected.clear()
|
||||||
self.send_gateway_message(MESSAGE_LOGGED_OUT)
|
self.send_gateway_message(MESSAGE_LOGGED_OUT)
|
||||||
self.send_gateway_status("Logged out", show="away")
|
self.send_gateway_status("Logged out", show="away")
|
||||||
elif event == whatsapp.EventContact:
|
return
|
||||||
|
self.log.debug("Waiting for connected event for %s", data)
|
||||||
|
await self.xmpp.loop.run_in_executor(None, self._connected.wait)
|
||||||
|
self.log.debug("Connected! Processing %s", data)
|
||||||
|
if event == whatsapp.EventContact:
|
||||||
await self.contacts.add_whatsapp_contact(data.Contact)
|
await self.contacts.add_whatsapp_contact(data.Contact)
|
||||||
elif event == whatsapp.EventGroup:
|
elif event == whatsapp.EventGroup:
|
||||||
await self.bookmarks.add_whatsapp_group(data.Group)
|
await self.bookmarks.add_whatsapp_group(data.Group)
|
||||||
|
|
Loading…
Reference in New Issue
Block a user