• 3 Posts
  • 18 Comments
Joined 20 days ago
cake
Cake day: January 1st, 2025

help-circle








  • With this script, can you get the list of subcribers ?

    I don’t think the Lemmy API exposes the subscriber list of a community, you’ll need access to the instance database.

    What I did was checking every post for the last 365 days for user activity and store every user that have interacted with the community.

    lemmy_session and lemmy_references are the same as for the LiveThreadBot (inside the src folder): https://gitlab.com/UlrikHD/lemmy-match-thread-bot

    get_posts() may be missing from the lemmy_session.py file though

        def get_posts(self, *, community: int | str | LemmyCommunity, sort: str = 'New', page: int = 1) -> dict[str, any]:
            """ Gets the posts of a community.
    
            :param community: The ID of the community to get the posts of, can also be a LemmyCommunity parseable
            string/object.
            :param sort: The sorting method of the posts, by default 'New'.
            :param page: The page number of the posts, by default 1.
            :return: The response JSON of the request as a dictionary.
            """
            if isinstance(community, LemmyCommunity) or isinstance(community, str):
                response: Final[requests.Response] = self.srv.get_posts(community_name=community, sort=sort, page=page,
                                                                        limit=50)
            else:
                response: Final[requests.Response] = self.srv.get_posts(community_id=community, sort=sort, page=page,
                                                                        limit=50)
            if response.status_code != 200:
                raise requests.exceptions.HTTPError(response.text)
            return response.json()
    

    Excuse the ugly code, it was written as a one-off

    import os
    import time
    import datetime
    from json import load, dump
    import requests
    from lemmy_references import LemmyCommunity, LemmyUser
    from lemmy_session import LemmySession
    
    
    session: LemmySession = LemmySession(website='https://lemmy.world/',
                                         username='TestUlrikHD',
                                         password='---',
                                         end_script_signal=None)
    
    posts: list[dict[[str, any]]] = []
    cutoff_date: datetime.datetime = datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(days=365)
    page_count: int = 1
    loop_break: bool = False
    while True:
        post_response: dict[str, any] = session.get_posts(community=LemmyCommunity('football', 'lemmy.world'),
                                                          page=page_count)
        page_count += 1
        for post in post_response['posts']:
            if datetime.datetime.fromisoformat(post['post']['published']) > cutoff_date:
                posts.append(post)
            else:
                loop_break = True
                break
        if loop_break:
            break
    
    user_dict: dict[str, dict[str, any]] = {}
    for post in posts:
        user_dict[str(LemmyUser(post['creator']['actor_id']))] = {'post': True, 'post_id': post['post']['id']}
        comments = session.get_post_comments(post_id=post['post']['id'])
        for comment in comments['comments']:
            user: str = str(LemmyUser(comment['creator']['actor_id']))
            if user not in user_dict:
                user_dict[user] = {'post': False, 'post_id': comment['post']['id'], 'parent_id': comment['comment']['id']}
    del user_dict[str(LemmyUser('FootballAutoMod@lemmy.world'))]
    del user_dict[str(LemmyUser('LiveThreadBot@lemmy.world'))]
    with open('user_dict', 'w', encoding='utf-8') as file:
        dump(user_dict, file, ensure_ascii=False, indent=4)
    
    
    def log_reply(usr: str) -> None:
        user_list: list[str] = []
        if os.path.isfile('reply_list.json'):
            with open('reply_list.json', 'r', encoding='utf-8') as file:
                user_list = load(file)
        user_list.append(str(usr))
        with open('reply_list.json', 'w', encoding='utf-8') as file:
            dump(user_list, file, ensure_ascii=False, indent=4)
    
    
    for username, user in user_dict.items():
        time.sleep(1)
        try:
            #if user['post']:
            #    session.reply(content='migration message', post_id=user['post_id'], parent_id=None)
            #else:
            #    session.reply(content='migration message', post_id=user['post_id'], parent_id=user['parent_id'])
            log_reply(usr=LemmyUser(username).str_link())
        except requests.HTTPError as e:
            print(f'Failed to send message to {username} - {e}')
    
    

    and this part creates txt for easy copy pasting for tagging.

    from json import load
    
    with open('reply_list.json', 'r', encoding='utf-8') as file:
        user_list: list[str] = load(file)
    
    loop_count: int = len(', '.join(user_list)) // 9500 + 1
    for i in range(loop_count):
        with open(f'reply_list_{i}.txt', 'w', encoding='utf-8') as file:
            print(len(' '.join(user_list[i * len(user_list) // loop_count:(i + 1) * len(user_list) // loop_count])))
            file.write(', '.join(user_list[i * len(user_list) // loop_count:(i + 1) * len(user_list) // loop_count]))