Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion src/tribler/core/database/orm_bindings/torrent_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,11 @@ def add_ffa_from_dict(cls: type[Self], metadata: dict) -> Self | None:
# two entries have different infohashes but the same id_. We do not want people to exploit this.
ih_blob = metadata["infohash"]
pk_blob = b""
if cls.exists(lambda g: (g.infohash == ih_blob) or (g.id_ == id_ and g.public_key == pk_blob)):
if results := cls.select(lambda g: (g.infohash == ih_blob) or (g.id_ == id_ and g.public_key == pk_blob)):
result = next((r for r in results if r.public_key == pk_blob), None)
# If the metainfo dict includes tracker_info, and the metadata in our db doesn't, update it.
if result and metadata.get("tracker_info") and not result.tracker_info:
result.tracker_info = metadata.get("tracker_info")
return None
if isinstance(metadata.get("tracker_info", ""), bytes):
metadata["tracker_info"] = metadata["tracker_info"].decode()
Expand Down
15 changes: 14 additions & 1 deletion src/tribler/core/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,18 @@ def rescue_keys(config: TriblerConfigManager) -> None:
f.write(default_eccrypto.generate_key("curve25519").key_to_bin())


class IPv8Endpoint(DispatcherEndpoint):
"""
An DispatcherEndpoint that allows its preferred interface to be updated.
"""

def update_preferred_interface(self) -> None:
"""
Update the preferred interface.
"""
self._preferred_interface = self.interfaces[self.interface_order[0]] if self.interface_order else None


class Session:
"""
A session manager that manages all components.
Expand All @@ -106,14 +118,15 @@ def __init__(self, config: TriblerConfigManager) -> None:

# IPv8
self.rust_endpoint: RustEndpoint | None = None
dpep = DispatcherEndpoint([])
dpep = IPv8Endpoint([])
if ipv4ifs := [e for e in self.config.get("ipv8")["interfaces"] if e["interface"] == "UDPIPv4"]:
dpep.interfaces["UDPIPv4"] = self.rust_endpoint = RustEndpoint(ipv4ifs[0]["port"], ipv4ifs[0]["ip"],
ipv4ifs[0].get("worker_threads", 4))
dpep.interface_order.append("UDPIPv4")
if ipv6ifs := [e for e in self.config.get("ipv8")["interfaces"] if e["interface"] == "UDPIPv6"]:
dpep.interfaces["UDPIPv6"] = UDPv6Endpoint(ipv6ifs[0]["port"], ipv6ifs[0]["ip"])
dpep.interface_order.append("UDPIPv6")
dpep.update_preferred_interface()

rescue_keys(self.config)
self.ipv8 = IPv8(cast("dict[str, Any]", self.config.get("ipv8")), endpoint_override=dpep)
Expand Down
43 changes: 0 additions & 43 deletions src/tribler/core/tunnel/caches.py

This file was deleted.

145 changes: 37 additions & 108 deletions src/tribler/core/tunnel/community.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
from __future__ import annotations

import hashlib
import math
import time
from asyncio import CancelledError, open_connection, timeout
from binascii import hexlify, unhexlify
from collections import Counter
from typing import TYPE_CHECKING, cast

from ipv8.messaging.anonymization.community import unpack_cell
from ipv8.messaging.anonymization.hidden_services import HiddenTunnelCommunity, HiddenTunnelSettings
from ipv8.messaging.anonymization.tunnel import (
CIRCUIT_STATE_READY,
Expand All @@ -23,8 +20,6 @@

from tribler.core.libtorrent.download_manager.download_state import DownloadState, DownloadStatus
from tribler.core.notifier import Notification, Notifier
from tribler.core.tunnel.caches import HTTPRequestCache
from tribler.core.tunnel.payload import HTTPRequestPayload, HTTPResponsePayload

if TYPE_CHECKING:
from pathlib import Path
Expand All @@ -33,6 +28,8 @@
from ipv8.messaging.anonymization.payload import CreatedPayload, CreatePayload, ExtendedPayload
from ipv8.messaging.interfaces.endpoint import Endpoint
from ipv8.messaging.interfaces.udp.endpoint import Address
from ipv8.messaging.payload import IntroductionResponsePayload, NewIntroductionResponsePayload
from ipv8.messaging.payload_headers import GlobalTimeDistributionPayload
from ipv8.peer import Peer
from ipv8_rust_tunnels.endpoint import RustEndpoint

Expand All @@ -41,6 +38,7 @@

DESTROY_REASON_BALANCE = 65535
PEER_FLAG_EXIT_HTTP = 32768
PEER_FLAG_EXIT_BACKUP = 16384
MAX_HTTP_PACKET_SIZE = 1400


Expand Down Expand Up @@ -68,6 +66,7 @@ class TriblerTunnelSettings(HiddenTunnelSettings):
download_manager: DownloadManager
exitnode_enabled: bool = False
default_hops: int = 0
max_intro_points: int = 10


class TriblerTunnelCommunity(HiddenTunnelCommunity):
Expand Down Expand Up @@ -95,9 +94,6 @@ def __init__(self, settings: TriblerTunnelSettings) -> None:
self.download_states: dict[bytes, DownloadStatus] = {}
self.last_forced_announce: dict[bytes, float] = {}

self.add_cell_handler(HTTPRequestPayload, self.on_http_request)
self.add_cell_handler(HTTPResponsePayload, self.on_http_response)

if settings.exitnode_cache is not None:
self.register_task("Load cached exitnodes", self.restore_exitnodes_from_disk, delay=0.5)

Expand Down Expand Up @@ -143,6 +139,16 @@ def restore_exitnodes_from_disk(self) -> None:
else:
self.logger.warning("Could not retrieve backup exitnode cache, file does not exist!")

def get_candidates(self, *requested_flags: int) -> list[Peer]:
"""
Get all the peers that we can create circuits with. When requesting exits, prefer peers that aren't backups.
"""
candidates = super().get_candidates(*requested_flags)
if PEER_FLAG_EXIT_BT in requested_flags:
candidates = [c for c in candidates
if PEER_FLAG_EXIT_BACKUP not in self.candidates.get(c, [])] or candidates
return candidates

async def should_join_circuit(self, create_payload: CreatePayload, previous_node_address: Address) -> bool:
"""
Check whether we should join a circuit. Returns a future that fires with a boolean.
Expand Down Expand Up @@ -256,7 +262,8 @@ def on_raw_data(self, circuit: Circuit, origin: tuple[str, int], data: bytes) ->
"""
We have incoming data.
"""
self.logger.warning("Unexpected data packet in on_raw_data")
self.logger.warning("Unexpected data packet in on_raw_data for circuit %s with uptime=%s",
circuit.circuit_id, int(time.time()) - circuit.creation_time)

def monitor_downloads(self, dslist: list[DownloadState]) -> None:
"""
Expand Down Expand Up @@ -318,7 +325,12 @@ def monitor_hidden_swarms(self, new_states: dict[bytes, DownloadStatus], hops: d
"""
Update the known swarms based on the changed states.
"""
ip_counter = Counter([c.info_hash for c in list(self.circuits.values()) if c.ctype == CIRCUIT_TYPE_IP_SEEDER])
intro_points = [c for c in self.circuits.values() if c.ctype == CIRCUIT_TYPE_IP_SEEDER]
intro_points_todo = self.settings.max_intro_points - len(intro_points)
if intro_points_todo <= 0:
return

ip_counter = Counter([c.info_hash for c in intro_points])
for info_hash in set(list(new_states) + list(self.download_states)):
new_state = new_states.get(info_hash)
old_state = self.download_states.get(info_hash, None)
Expand All @@ -338,6 +350,8 @@ def on_join_swarm(addr: Address, ih: bytes = info_hash) -> None:
# Ensure we have enough introduction points for this infohash. Currently, we only create 1.
if new_state == DownloadStatus.SEEDING:
for _ in range(1 - ip_counter.get(info_hash, 0)):
if intro_points_todo <= 0:
return
self.logger.info("Create introducing circuit for %s", hexlify(info_hash))
self.create_introduction_point(info_hash)

Expand All @@ -363,6 +377,19 @@ def get_download(self, lookup_info_hash: bytes) -> Download | None:
return download
return None

def introduction_response_callback(self, peer: Peer, dist: GlobalTimeDistributionPayload,
payload: IntroductionResponsePayload | NewIntroductionResponsePayload) -> None:
"""
Try to discover peers that are behind a different port.
"""
if (payload.wan_introduction_address != ("0.0.0.0", 0)
and payload.lan_introduction_address != ("0.0.0.0", 0)
and payload.wan_introduction_address[0] != self.my_estimated_wan[0]):
addr = (payload.wan_introduction_address[0], payload.lan_introduction_address[1])
self.network.discover_address(peer, addr, self.community_id, payload.intro_supports_new_style)

return super().introduction_response_callback(peer, dist, payload)

@task
async def create_introduction_point(self, info_hash: bytes, required_ip: Peer | None = None) -> None:
"""
Expand Down Expand Up @@ -399,101 +426,3 @@ def get_lookup_info_hash(self, info_hash: bytes) -> bytes:
Get the SHA-1 hash to lookup for a given torrent info hash.
"""
return hashlib.sha1(b"tribler anonymous download" + hexlify(info_hash)).digest()

@unpack_cell(HTTPRequestPayload)
async def on_http_request(self, source_address: Address, payload: HTTPRequestPayload, # noqa: C901
circuit_id: int) -> None:
"""
Callback for when an HTTP request is received.
"""
if circuit_id not in self.exit_sockets:
self.logger.warning("Received unexpected http-request")
return
if len([cache for cache in self.request_cache._identifiers.values() # noqa: SLF001
if isinstance(cache, HTTPRequestCache) and cache.circuit_id == circuit_id]) > 5:
self.logger.warning("Too many HTTP requests coming from circuit %s")
return

self.logger.debug("Got http-request from %s", source_address)

writer = None
try:
async with timeout(10):
self.logger.debug("Opening TCP connection to %s", payload.target)
reader, writer = await open_connection(*payload.target)
writer.write(payload.request)
response = b""
while True:
line = await reader.readline()
response += line
if not line.strip():
# Read HTTP response body (1MB max)
response += await reader.read(1024 ** 2)
break
except OSError:
self.logger.warning("Tunnel HTTP request failed")
return
except TimeoutError:
self.logger.warning("Tunnel HTTP request timed out")
return
finally:
if writer:
writer.close()

if not response.startswith(b"HTTP/1.1 307"):
_, _, bencoded_data = response.partition(b'\r\n\r\n')

if not is_bencoded(bencoded_data):
self.logger.warning("Tunnel HTTP request not allowed")
return

num_cells = math.ceil(len(response) / MAX_HTTP_PACKET_SIZE)
for i in range(num_cells):
self.send_cell(source_address,
HTTPResponsePayload(circuit_id, payload.identifier, i, num_cells,
response[i * MAX_HTTP_PACKET_SIZE:(i + 1) * MAX_HTTP_PACKET_SIZE]))

@unpack_cell(HTTPResponsePayload)
def on_http_response(self, source_address: Address, payload: HTTPResponsePayload, circuit_id: int) -> None:
"""
Callback for when an HTTP response is received.
"""
cache = self.request_cache.get(HTTPRequestCache, payload.identifier)
if cache is None:
self.logger.warning("Received unexpected http-response")
return
if cache.circuit_id != payload.circuit_id:
self.logger.warning("Received http-response from wrong circuit %s != %s", cache.circuit_id, circuit_id)
return

self.logger.debug("Got http-response from %s", source_address)
if cache.add_response(payload):
self.request_cache.pop(HTTPRequestCache, payload.identifier)

async def perform_http_request(self, destination: Address, request: bytes, hops: int = 1) -> bytes:
"""
Perform the actual HTTP request to service the given request.
"""
# We need a circuit that supports HTTP requests, meaning that the circuit will have to end
# with a node that has the PEER_FLAG_EXIT_HTTP flag set.
circuit = None
circuits = self.find_circuits(exit_flags=[PEER_FLAG_EXIT_HTTP])
if circuits:
circuit = circuits[0]
else:
# Try to create a circuit. Attempt at most 3 times.
for _ in range(3):
circuit = self.create_circuit(hops, exit_flags=[PEER_FLAG_EXIT_HTTP])
if circuit and await circuit.ready:
break

if not circuit or circuit.state != CIRCUIT_STATE_READY:
msg = "No HTTP circuit available"
raise RuntimeError(msg)

cache = self.request_cache.add(HTTPRequestCache(self, circuit.circuit_id))
if cache is not None:
self.send_cell(circuit.hop.address, HTTPRequestPayload(circuit.circuit_id, cache.number, destination,
request))
return await cache.response_future
raise CancelledError
68 changes: 0 additions & 68 deletions src/tribler/test_unit/core/tunnel/test_caches.py

This file was deleted.

Loading