Page direct-message history backwards so old conversations load
This commit is contained in:
@@ -65,17 +65,69 @@ export function useDirectMessages() {
|
||||
|
||||
// Both directions of every conversation we're part of: messages we sent
|
||||
// (authors = self) and messages addressed to us (#p = self).
|
||||
const events = await nostr.query(
|
||||
[
|
||||
{ kinds: [DM_KIND], authors: [self] },
|
||||
{ kinds: [DM_KIND], '#p': [self] },
|
||||
],
|
||||
{ signal },
|
||||
);
|
||||
|
||||
// Dedupe by id (a self-sent message can match both filters).
|
||||
//
|
||||
// We stream with `req()` rather than `query()` and wait for *every*
|
||||
// relay to reach EOSE. `query()` resolves on the pool's short EOSE
|
||||
// timeout (the first relay to finish), which silently drops DMs that
|
||||
// only live on slower relays — one cause of "missing conversations".
|
||||
//
|
||||
// Relays return newest-first and cap each REQ at `limit`, so a single
|
||||
// fetch truncates long histories — the oldest conversations (years back)
|
||||
// fall off the end. We page backwards with `until`: after each full page
|
||||
// we lower `until` to the oldest event seen and ask again, stopping once
|
||||
// a page comes back short (nothing older remains). An overall timeout
|
||||
// keeps a stalled relay from hanging the page.
|
||||
const byId = new Map<string, NostrEvent>();
|
||||
for (const event of events) byId.set(event.id, event);
|
||||
const PAGE_SIZE = 500;
|
||||
const MAX_PAGES = 40; // hard ceiling: up to ~20k DMs
|
||||
|
||||
const timeout = AbortSignal.timeout(15000);
|
||||
const controller = new AbortController();
|
||||
const onAbort = () => controller.abort();
|
||||
signal?.addEventListener('abort', onAbort);
|
||||
timeout.addEventListener('abort', onAbort);
|
||||
|
||||
try {
|
||||
let until: number | undefined;
|
||||
for (let page = 0; page < MAX_PAGES; page++) {
|
||||
const base = { kinds: [DM_KIND], limit: PAGE_SIZE };
|
||||
const window = until === undefined ? {} : { until };
|
||||
let pageCount = 0;
|
||||
let oldest = Infinity;
|
||||
|
||||
for await (const msg of nostr.req(
|
||||
[
|
||||
{ ...base, ...window, authors: [self] },
|
||||
{ ...base, ...window, '#p': [self] },
|
||||
],
|
||||
{ signal: controller.signal },
|
||||
)) {
|
||||
if (msg[0] === 'EVENT') {
|
||||
const event = msg[2];
|
||||
// Dedupe by id (a self-sent message can match both filters,
|
||||
// and overlapping `until` windows can re-deliver the boundary).
|
||||
if (!byId.has(event.id)) pageCount++;
|
||||
byId.set(event.id, event);
|
||||
if (event.created_at < oldest) oldest = event.created_at;
|
||||
} else if (msg[0] === 'EOSE') {
|
||||
// The pool emits a single EOSE once all routed relays are done.
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// A short page means relays had nothing older to give — we're done.
|
||||
// (Two filters at PAGE_SIZE each could yield up to 2*PAGE_SIZE.)
|
||||
if (pageCount < PAGE_SIZE || oldest === Infinity) break;
|
||||
// Step the window back. `until` is inclusive, so subtract 1 to avoid
|
||||
// re-fetching the exact boundary event forever.
|
||||
until = oldest - 1;
|
||||
}
|
||||
} catch {
|
||||
// Abort (unmount/timeout) — fall through with whatever we collected.
|
||||
} finally {
|
||||
signal?.removeEventListener('abort', onAbort);
|
||||
timeout.removeEventListener('abort', onAbort);
|
||||
}
|
||||
|
||||
// Group by counterparty pubkey.
|
||||
const byPeer = new Map<string, NostrEvent[]>();
|
||||
|
||||
Reference in New Issue
Block a user