Render DM inbox fast, then backfill slower relays
Use the shared pooled query for the initial messages page so the inbox keeps Ditto-style fast first paint with the provider's short eoseTimeout. Page 0 is marked pending backfill, then a background effect queries each configured read relay individually, merges those slower results into the same infinite-query cache entry, and unlocks older-page pagination after backfill completes. Also update conversation merging so a backfilled newer event updates the row's latest preview and sort order.
This commit is contained in:
+193
-75
@@ -1,5 +1,5 @@
|
||||
import { useNostr } from '@nostrify/react';
|
||||
import { useMemo } from 'react';
|
||||
import { useEffect, useMemo } from 'react';
|
||||
import { useInfiniteQuery, useMutation, useQuery, useQueryClient, type InfiniteData } from '@tanstack/react-query';
|
||||
import type { NostrEvent, NostrFilter, NostrSigner } from '@nostrify/nostrify';
|
||||
|
||||
@@ -46,6 +46,7 @@ interface DirectMessagesPage {
|
||||
conversations: Conversation[];
|
||||
sentUntil: number | null;
|
||||
receivedUntil: number | null;
|
||||
backfillComplete: boolean;
|
||||
}
|
||||
|
||||
interface DirectMessagesCursor {
|
||||
@@ -58,6 +59,103 @@ function recipientOf(event: NostrEvent): string | undefined {
|
||||
return event.tags.find(([name]) => name === 'p')?.[1];
|
||||
}
|
||||
|
||||
function buildDirectMessageFilters(self: string, cursor: DirectMessagesCursor): NostrFilter[] {
|
||||
const filters: NostrFilter[] = [];
|
||||
if (cursor.sentUntil !== null) {
|
||||
filters.push({
|
||||
kinds: [DM_KIND],
|
||||
authors: [self],
|
||||
limit: PAGE_SIZE,
|
||||
...(cursor.sentUntil === undefined ? {} : { until: cursor.sentUntil }),
|
||||
});
|
||||
}
|
||||
if (cursor.receivedUntil !== null) {
|
||||
filters.push({
|
||||
kinds: [DM_KIND],
|
||||
'#p': [self],
|
||||
limit: PAGE_SIZE,
|
||||
...(cursor.receivedUntil === undefined ? {} : { until: cursor.receivedUntil }),
|
||||
});
|
||||
}
|
||||
return filters;
|
||||
}
|
||||
|
||||
async function directMessagesPageFromEvents({
|
||||
events,
|
||||
self,
|
||||
nip04,
|
||||
backfillComplete,
|
||||
}: {
|
||||
events: NostrEvent[];
|
||||
self: string;
|
||||
nip04: NonNullable<NostrSigner['nip04']>;
|
||||
backfillComplete: boolean;
|
||||
}): Promise<DirectMessagesPage> {
|
||||
const sentEvents = events.filter((event) => event.pubkey === self);
|
||||
const receivedEvents = events.filter((event) => recipientOf(event) === self);
|
||||
const nextSentUntil = getNextUntil(sentEvents);
|
||||
const nextReceivedUntil = getNextUntil(receivedEvents);
|
||||
|
||||
const byPeer = new Map<string, NostrEvent[]>();
|
||||
for (const event of events) {
|
||||
const outgoing = event.pubkey === self;
|
||||
const peer = outgoing ? recipientOf(event) : event.pubkey;
|
||||
if (!peer) continue;
|
||||
const list = byPeer.get(peer) ?? [];
|
||||
list.push(event);
|
||||
byPeer.set(peer, list);
|
||||
}
|
||||
|
||||
const conversations: Conversation[] = [];
|
||||
for (const [peer, peerEvents] of byPeer) {
|
||||
peerEvents.sort((a, b) => a.created_at - b.created_at);
|
||||
|
||||
const latestEvent = peerEvents[peerEvents.length - 1];
|
||||
if (!latestEvent) continue;
|
||||
conversations.push({
|
||||
peer,
|
||||
events: peerEvents,
|
||||
messageCount: peerEvents.length,
|
||||
latest: await decryptMessage({ event: latestEvent, peer, self, nip04 }),
|
||||
});
|
||||
}
|
||||
|
||||
conversations.sort((a, b) => b.latest.createdAt - a.latest.createdAt);
|
||||
return { conversations, sentUntil: nextSentUntil, receivedUntil: nextReceivedUntil, backfillComplete };
|
||||
}
|
||||
|
||||
function mergeDirectMessagesPage(base: DirectMessagesPage, incoming: DirectMessagesPage): DirectMessagesPage {
|
||||
const byPeer = new Map<string, Conversation>();
|
||||
|
||||
for (const conversation of [...base.conversations, ...incoming.conversations]) {
|
||||
const existing = byPeer.get(conversation.peer);
|
||||
if (!existing) {
|
||||
byPeer.set(conversation.peer, { ...conversation, events: [...conversation.events] });
|
||||
continue;
|
||||
}
|
||||
|
||||
const seen = new Set(existing.events.map((event) => event.id));
|
||||
for (const event of conversation.events) {
|
||||
if (!seen.has(event.id)) {
|
||||
existing.events.push(event);
|
||||
seen.add(event.id);
|
||||
}
|
||||
}
|
||||
existing.events.sort((a, b) => a.created_at - b.created_at);
|
||||
existing.messageCount = existing.events.length;
|
||||
if (conversation.latest.createdAt > existing.latest.createdAt) {
|
||||
existing.latest = conversation.latest;
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
conversations: [...byPeer.values()].sort((a, b) => b.latest.createdAt - a.latest.createdAt),
|
||||
sentUntil: incoming.sentUntil,
|
||||
receivedUntil: incoming.receivedUntil,
|
||||
backfillComplete: incoming.backfillComplete,
|
||||
};
|
||||
}
|
||||
|
||||
/** True if the signer can perform NIP-04 encryption. */
|
||||
export function useHasDmSupport(): boolean {
|
||||
const { user } = useCurrentUser();
|
||||
@@ -70,26 +168,25 @@ export function useHasDmSupport(): boolean {
|
||||
* conversations sorted by most-recent activity. Full threads decrypt lazily in
|
||||
* `useDirectMessageThread` after the user selects a conversation.
|
||||
*
|
||||
* Messages are read from the app's configured relays. Rather than going
|
||||
* through the shared pool (whose `eoseTimeout` resolves a query as soon as the
|
||||
* *first* relay sends EOSE — which dropped conversations held only by slower
|
||||
* relays and produced a different, incomplete count on every refresh), this
|
||||
* fans the query out to each read relay individually and merges the results.
|
||||
* Each per-relay `query()` resolves on that relay's own EOSE, so every relay
|
||||
* gets to deliver its events (bounded by an overall 8s cap). NIP-04 leaks
|
||||
* metadata and is deprecated in favor of NIP-44/NIP-17; this exists for interop
|
||||
* with clients that still send kind-4 DMs.
|
||||
* The first page uses the shared pool so it renders quickly with the app's
|
||||
* normal `eoseTimeout`. A background backfill then queries each read relay
|
||||
* individually and merges those slower results into the same cache entry. That
|
||||
* preserves a fast inbox while still filling conversations that only exist on a
|
||||
* slower relay. NIP-04 leaks metadata and is deprecated in favor of NIP-44/NIP-17;
|
||||
* this exists for interop with clients that still send kind-4 DMs.
|
||||
*/
|
||||
export function useDirectMessages() {
|
||||
const { nostr } = useNostr();
|
||||
const { user } = useCurrentUser();
|
||||
const { config } = useAppContext();
|
||||
const queryClient = useQueryClient();
|
||||
const self = user?.pubkey;
|
||||
|
||||
const readRelays = useMemo(
|
||||
() => config.relayMetadata.relays.filter((r) => r.read).map((r) => r.url),
|
||||
() => [...new Set(config.relayMetadata.relays.filter((r) => r.read).map((r) => r.url))].sort(),
|
||||
[config.relayMetadata.relays],
|
||||
);
|
||||
const queryKey = useMemo(() => ['direct-messages', self, readRelays] as const, [self, readRelays]);
|
||||
|
||||
const query = useInfiniteQuery<
|
||||
DirectMessagesPage,
|
||||
@@ -98,38 +195,19 @@ export function useDirectMessages() {
|
||||
readonly unknown[],
|
||||
DirectMessagesCursor
|
||||
>({
|
||||
queryKey: ['direct-messages', self, readRelays],
|
||||
queryKey,
|
||||
enabled: !!self && !!user?.signer.nip04,
|
||||
initialPageParam: {},
|
||||
queryFn: async ({ signal, pageParam }) => {
|
||||
if (!self || !user?.signer.nip04) {
|
||||
return { conversations: [], sentUntil: null, receivedUntil: null };
|
||||
return { conversations: [], sentUntil: null, receivedUntil: null, backfillComplete: true };
|
||||
}
|
||||
const nip04 = user.signer.nip04;
|
||||
|
||||
// Ditto's feed paginates with oldest-timestamp cursors instead of loading
|
||||
// the full history up front. DMs need separate cursors for sent and
|
||||
// received filters so one high-volume direction does not skip the other.
|
||||
const filters: NostrFilter[] = [];
|
||||
if (pageParam.sentUntil !== null) {
|
||||
filters.push({
|
||||
kinds: [DM_KIND],
|
||||
authors: [self],
|
||||
limit: PAGE_SIZE,
|
||||
...(pageParam.sentUntil === undefined ? {} : { until: pageParam.sentUntil }),
|
||||
});
|
||||
}
|
||||
if (pageParam.receivedUntil !== null) {
|
||||
filters.push({
|
||||
kinds: [DM_KIND],
|
||||
'#p': [self],
|
||||
limit: PAGE_SIZE,
|
||||
...(pageParam.receivedUntil === undefined ? {} : { until: pageParam.receivedUntil }),
|
||||
});
|
||||
}
|
||||
const filters = buildDirectMessageFilters(self, pageParam);
|
||||
|
||||
if (filters.length === 0) {
|
||||
return { conversations: [], sentUntil: null, receivedUntil: null };
|
||||
return { conversations: [], sentUntil: null, receivedUntil: null, backfillComplete: true };
|
||||
}
|
||||
|
||||
const timeout = AbortSignal.timeout(8000);
|
||||
@@ -140,7 +218,16 @@ export function useDirectMessages() {
|
||||
|
||||
const byId = new Map<string, NostrEvent>();
|
||||
try {
|
||||
if (readRelays.length > 0) {
|
||||
const isInitialPage = pageParam.sentUntil === undefined && pageParam.receivedUntil === undefined;
|
||||
if (isInitialPage || readRelays.length === 0) {
|
||||
// Fast path: use the pooled query and let the provider's short
|
||||
// eoseTimeout render the inbox immediately. Page 0 is backfilled by
|
||||
// the effect below before older-page pagination is enabled.
|
||||
const events = await nostr.query(filters, { signal: controller.signal });
|
||||
for (const event of events) {
|
||||
byId.set(event.id, event);
|
||||
}
|
||||
} else {
|
||||
// Query each read relay individually and merge. Going through the
|
||||
// shared pool resolves on the first relay's EOSE (+ a short
|
||||
// eoseTimeout), which silently drops DMs held only by slower relays.
|
||||
@@ -157,12 +244,6 @@ export function useDirectMessages() {
|
||||
byId.set(event.id, event);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// No read relays configured — fall back to the default pool.
|
||||
const events = await nostr.query(filters, { signal: controller.signal });
|
||||
for (const event of events) {
|
||||
byId.set(event.id, event);
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
// Abort (unmount/timeout) — fall through with whatever we collected.
|
||||
@@ -171,47 +252,81 @@ export function useDirectMessages() {
|
||||
timeout.removeEventListener('abort', onAbort);
|
||||
}
|
||||
|
||||
const pageEvents = [...byId.values()];
|
||||
const sentEvents = pageEvents.filter((event) => event.pubkey === self);
|
||||
const receivedEvents = pageEvents.filter((event) => recipientOf(event) === self);
|
||||
const nextSentUntil = getNextUntil(sentEvents);
|
||||
const nextReceivedUntil = getNextUntil(receivedEvents);
|
||||
|
||||
// Group by counterparty pubkey.
|
||||
const byPeer = new Map<string, NostrEvent[]>();
|
||||
for (const event of pageEvents) {
|
||||
const outgoing = event.pubkey === self;
|
||||
const peer = outgoing ? recipientOf(event) : event.pubkey;
|
||||
if (!peer) continue;
|
||||
const list = byPeer.get(peer) ?? [];
|
||||
list.push(event);
|
||||
byPeer.set(peer, list);
|
||||
}
|
||||
|
||||
const conversations: Conversation[] = [];
|
||||
for (const [peer, peerEvents] of byPeer) {
|
||||
peerEvents.sort((a, b) => a.created_at - b.created_at);
|
||||
|
||||
const latestEvent = peerEvents[peerEvents.length - 1];
|
||||
if (!latestEvent) continue;
|
||||
conversations.push({
|
||||
peer,
|
||||
events: peerEvents,
|
||||
messageCount: peerEvents.length,
|
||||
latest: await decryptMessage({ event: latestEvent, peer, self, nip04 }),
|
||||
});
|
||||
}
|
||||
|
||||
// Most-recently-active conversations first.
|
||||
conversations.sort((a, b) => b.latest.createdAt - a.latest.createdAt);
|
||||
return { conversations, sentUntil: nextSentUntil, receivedUntil: nextReceivedUntil };
|
||||
const isInitialPage = pageParam.sentUntil === undefined && pageParam.receivedUntil === undefined;
|
||||
return directMessagesPageFromEvents({
|
||||
events: [...byId.values()],
|
||||
self,
|
||||
nip04,
|
||||
backfillComplete: !isInitialPage || readRelays.length === 0,
|
||||
});
|
||||
},
|
||||
getNextPageParam: (lastPage) => {
|
||||
if (!lastPage.backfillComplete) return undefined;
|
||||
if (lastPage.sentUntil === null && lastPage.receivedUntil === null) return undefined;
|
||||
return { sentUntil: lastPage.sentUntil, receivedUntil: lastPage.receivedUntil };
|
||||
},
|
||||
});
|
||||
|
||||
useEffect(() => {
|
||||
if (!self || !user?.signer.nip04 || readRelays.length === 0) return;
|
||||
const firstPage = query.data?.pages[0];
|
||||
if (!firstPage || firstPage.backfillComplete) return;
|
||||
|
||||
const nip04 = user.signer.nip04;
|
||||
const filters = buildDirectMessageFilters(self, {});
|
||||
const controller = new AbortController();
|
||||
const timeout = AbortSignal.timeout(12_000);
|
||||
const onAbort = () => controller.abort();
|
||||
timeout.addEventListener('abort', onAbort);
|
||||
|
||||
void (async () => {
|
||||
const byId = new Map<string, NostrEvent>();
|
||||
try {
|
||||
const perRelay = await Promise.allSettled(
|
||||
readRelays.map((url) =>
|
||||
nostr.relay(url).query(filters, { signal: controller.signal }),
|
||||
),
|
||||
);
|
||||
for (const result of perRelay) {
|
||||
if (result.status !== 'fulfilled') continue;
|
||||
for (const event of result.value) {
|
||||
byId.set(event.id, event);
|
||||
}
|
||||
}
|
||||
|
||||
const incoming = await directMessagesPageFromEvents({
|
||||
events: [...byId.values()],
|
||||
self,
|
||||
nip04,
|
||||
backfillComplete: true,
|
||||
});
|
||||
|
||||
queryClient.setQueryData<InfiniteData<DirectMessagesPage, DirectMessagesCursor>>(queryKey, (data) => {
|
||||
if (!data?.pages[0] || data.pages[0].backfillComplete) return data;
|
||||
const pages = [...data.pages];
|
||||
pages[0] = mergeDirectMessagesPage(pages[0], incoming);
|
||||
return { ...data, pages };
|
||||
});
|
||||
} catch {
|
||||
if (!controller.signal.aborted) {
|
||||
queryClient.setQueryData<InfiniteData<DirectMessagesPage, DirectMessagesCursor>>(queryKey, (data) => {
|
||||
if (!data?.pages[0] || data.pages[0].backfillComplete) return data;
|
||||
const pages = [...data.pages];
|
||||
pages[0] = { ...pages[0], backfillComplete: true };
|
||||
return { ...data, pages };
|
||||
});
|
||||
}
|
||||
} finally {
|
||||
timeout.removeEventListener('abort', onAbort);
|
||||
}
|
||||
})();
|
||||
|
||||
return () => {
|
||||
timeout.removeEventListener('abort', onAbort);
|
||||
controller.abort();
|
||||
};
|
||||
}, [nostr, query.data?.pages, queryClient, queryKey, readRelays, self, user?.signer.nip04]);
|
||||
|
||||
const conversations = useMemo(() => mergeConversationPages(query.data), [query.data]);
|
||||
|
||||
return { ...query, data: conversations, pageCount: query.data?.pages.length ?? 0 };
|
||||
@@ -245,6 +360,9 @@ function mergeConversationPages(data: { pages: DirectMessagesPage[] } | undefine
|
||||
}
|
||||
existing.events.sort((a, b) => a.created_at - b.created_at);
|
||||
existing.messageCount = existing.events.length;
|
||||
if (conversation.latest.createdAt > existing.latest.createdAt) {
|
||||
existing.latest = conversation.latest;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user